Code Monkey home page Code Monkey logo

r2dbc-postgresql's Introduction

PostgreSQL R2DBC Driver Java CI with Maven Maven Central

This project contains the PostgreSQL implementation of the R2DBC SPI. This implementation is not intended to be used directly, but rather to be used as the backing implementation for a humane client library to delegate to.

This driver provides the following features:

  • Implements R2DBC 1.0
  • Login with username/password (MD5, SASL/SCRAM) or implicit trust
  • Supports credential rotation by providing Supplier<String> or Publisher<String>
  • SCRAM authentication
  • Unix Domain Socket transport
  • Connection Fail-over supporting multiple hosts
  • TLS
  • Explicit transactions
  • Notifications
  • Logical Decode
  • Binary data transfer
  • Execution of prepared statements with bindings
  • Execution of batch statements without bindings
  • Read and write support for a majority of data types (see Data Type Mapping for details)
  • Fetching of REFCURSOR using io.r2dbc.postgresql.api.RefCursor
  • Extension points to register Codecs to handle additional PostgreSQL data types

Next steps:

  • Multi-dimensional arrays

Code of Conduct

This project is governed by the Code of Conduct. By participating, you are expected to uphold this code of conduct. Please report unacceptable behavior to [email protected].

Getting Started

Here is a quick teaser of how to use R2DBC PostgreSQL in Java:

URL Connection Factory Discovery

ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:postgresql://<host>:5432/<database>");

Publisher<? extends Connection> connectionPublisher = connectionFactory.create();

Programmatic Connection Factory Discovery

Map<String, String> options = new HashMap<>();
options.put("lock_timeout", "10s");
options.put("statement_timeout", "5m");

ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
   .option(DRIVER, "postgresql")
   .option(HOST, "...")
   .option(PORT, 5432)  // optional, defaults to 5432
   .option(USER, "...")
   .option(PASSWORD, "...")
   .option(DATABASE, "...")  // optional
   .option(OPTIONS, options) // optional
   .build());

Publisher<? extends Connection> connectionPublisher = connectionFactory.create();

// Alternative: Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

Supported ConnectionFactory Discovery Options

Option Description
ssl Enables SSL usage (SSLMode.VERIFY_FULL).
driver Must be postgresql.
protocol Protocol specifier. Empty to use single-host operations. Supported: failover for multi-server failover operations. (Optional)
host Server hostname to connect to. May contain a comma-separated list of hosts with ports when using the failover protocol.
port Server port to connect to. Defaults to 5432. (Optional)
socket Unix Domain Socket path to connect to as alternative to TCP. (Optional)
username Login username. Can be a plain String, Supplier<String>, or Publisher<String>.
password Login password. Can be a plain CharSequence, Supplier<CharSequence>, or Publisher<CharSequence>. (Optional when using TLS Certificate authentication)
database Database to select. (Optional)
applicationName The name of the application connecting to the database. Defaults to r2dbc-postgresql. (Optional)
autodetectExtensions Whether to auto-detect and register Extensions from the class path. Defaults to true. (Optional)
compatibilityMode Enable compatibility mode for cursored fetching. Required when using newer pgpool versions. Defaults to false. (Optional)
errorResponseLogLevel Log level for error responses. Any of OFF, DEBUG, INFO, WARN or ERROR Defaults to DEBUG. (Optional)
extensions Collection of Extension to provide additional extensions when creating a connection factory. Defaults to empty. (Optional)
fetchSize The default number of rows to return when fetching results. Defaults to 0 for unlimited. (Optional)
forceBinary Whether to force binary transfer. Defaults to false. (Optional)
hostRecheckTime Host status recheck time when using multi-server operations. Defaults to 10 seconds. (Optional)
loadBalanceHosts Whether to shuffle the list of given hostnames before connect when using multi-server operations. Defaults to true. (Optional)
loopResources TCP/Socket LoopResources (depends on the endpoint connection type). (Optional)
lockWaitTimeout Lock wait timeout. (Optional)
noticeLogLevel Log level for error responses. Any of OFF, DEBUG, INFO, WARN or ERROR Defaults to DEBUG. (Optional)
preferAttachedBuffers Configure whether codecs should prefer attached data buffers. The default is false, meaning that codecs will copy data from the input buffer into a byte array. Enabling attached buffers requires consumption of values such as Json to avoid memory leaks.
preparedStatementCacheQueries Determine the number of queries that are cached in each connection. The default is -1, meaning there's no limit. The value of 0 disables the cache. Any other value specifies the cache size.
options A Map<String, String> of connection parameters. These are applied to each database connection created by the ConnectionFactory. Useful for setting generic PostgreSQL connection parameters. (Optional)
schema The search path to set. (Optional)
sslMode SSL mode to use, see SSLMode enum. Supported values: DISABLE, ALLOW, PREFER, REQUIRE, VERIFY_CA, VERIFY_FULL, TUNNEL. (Optional)
sslRootCert Path to SSL CA certificate in PEM format. Can be also a resource path. (Optional)
sslKey Path to SSL key for TLS authentication in PEM format. Can be also a resource path. (Optional)
sslCert Path to SSL certificate for TLS authentication in PEM format. Can be also a resource path. (Optional)
sslPassword Key password to decrypt SSL key. (Optional)
sslHostnameVerifier javax.net.ssl.HostnameVerifier implementation. (Optional)
sslSni Enable/disable SNI to send the configured host name during the SSL handshake. Defaults to true. (Optional)
statementTimeout Statement timeout. (Optional)
targetServerType Type of server to use when using multi-host operations. Supported values: ANY, PRIMARY, SECONDARY, PREFER_SECONDARY. Defaults to ANY. (Optional)
tcpNoDelay Enable/disable TCP NoDelay. Enabled by default. (Optional)
tcpKeepAlive Enable/disable TCP KeepAlive. Disabled by default. (Optional)
timeZone Configure the session timezone to control conversion of local temporal representations. Defaults to TimeZone.getDefault() (Optional)

Programmatic Configuration

Map<String, String> options = new HashMap<>();
options.put("lock_timeout", "10s");

PostgresqlConnectionFactory connectionFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
    .host("...")
    .port(5432)  // optional, defaults to 5432
    .username("...")
    .password("...")
    .database("...")  // optional
    .options(options) // optional
    .build());

Mono<Connection> mono = connectionFactory.create();

PostgreSQL uses index parameters that are prefixed with $. The following SQL statement makes use of parameters:

INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3)

Parameters are referenced using the same identifiers when binding these:

mono.flatMapMany(connection -> connection
                .createStatement("INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3)")
                .bind("$1", 1)
                .bind("$2", "Walter")
                .bind("$3", "White")
                .execute());

Binding also allowed positional index (zero-based) references. The parameter index is derived from the parameter discovery order when parsing the query.

Maven configuration

Artifacts can be found on Maven Central.

<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <version>${version}</version>
</dependency>

If you'd rather like the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.

<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <version>${version}.BUILD-SNAPSHOT</version>
</dependency>

<repository>
<id>sonatype-nexus-snapshots</id>
<name>Sonatype OSS Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</repository>

Connection Fail-over

To support simple connection fail-over it is possible to define multiple endpoints (host and port pairs) in the connection url separated by commas. The driver will try once to connect to each of them in order until the connection succeeds. If none succeeds a normal connection exception is thrown. Make sure to specify the failover protocol.

The syntax for the connection url is:

r2dbc:postgresql:failover://user:foo@host1:5433,host2:5432,host3

For example an application can create two connection pools. One data source is for writes, another for reads. The write pool limits connections only to a primary node:

r2dbc:postgresql:failover://user:foo@host1:5433,host2:5432,host3?targetServerType=primary.

Cursors

R2DBC Postgres supports both, the simple and extended message flow.

Cursored fetching is activated by configuring a fetchSize. Postgres cursors are valid for the duration of a transaction. R2DBC can use cursors in auto-commit mode (Execute and Flush) to not require an explicit transaction (BEGIN…COMMIT/ROLLBACK). Newer pgpool versions don't support this feature. To work around this limitation, either use explicit transactions when configuring a fetch size or enable compatibility mode. Compatibility mode avoids cursors in auto-commit mode (Execute with no limit + Sync). Cursors in a transaction use Execute (with fetch size as limit) + Sync as message flow.

Listen/Notify

Listen and Notify provide a simple form of signal or inter-process communication mechanism for processes accessing the same PostgreSQL database. For Listen/Notify, two actors are involved: The sender (notify) and the receiver (listen). The following example uses two connections to illustrate how they work together:

PostgresqlConnection sender= …;
        PostgresqlConnection receiver= …;

Flux<Notification> listen = receiver.createStatement("LISTEN mymessage")
                                .execute()
                                .flatMap(PostgresqlResult::getRowsUpdated)
        .thenMany(receiver.getNotifications());

        Mono<Void> notify=sender.createStatement("NOTIFY mymessage, 'Hello World'")
        .execute()
        .flatMap(PostgresqlResult::getRowsUpdated)
        .then();

Upon subscription, the first connection enters listen mode and publishes incoming Notifications as Flux. The second connection broadcasts a notification to the mymessage channel upon subscription.

Transaction Definitions

Postgres supports additional options when starting a transaction. In particular, the following options can be specified:

  • Isolation Level (isolationLevel) (reset after the transaction to previous value)
  • Transaction Mutability (readOnly)
  • Deferrable Mode (deferrable)

These options can be specified upon transaction begin to start the transaction and apply options in a single command roundtrip:

PostgresqlConnection connection= …;

        connection.beginTransaction(PostgresTransactionDefinition.from(IsolationLevel.SERIALIZABLE).readOnly().notDeferrable());

See also: https://www.postgresql.org/docs/current/sql-begin.html

JSON/JSONB support

PostgreSQL supports JSON by storing values in JSON/JSONB columns. These values can be consumed and written using the regular R2DBC SPI and by using driver-specific extensions with the io.r2dbc.postgresql.codec.Json type.

You can choose from two approaches:

  • Native JSONB encoding using the Json wrapper type.
  • Using scalar types.

The difference between the Json type and scalar types is that Json values are written encoded as JSONB to the database. byte[] and String types are represented as BYTEA respective VARCHAR and require casting ($1::JSON) when used with parameterized statements.

The following code shows INSERT and SELECT cases for JSON interaction:

CREATE TABLE my_table (my_json JSON);

Write JSON

connection.createStatement("INSERT INTO my_table (my_json) VALUES($1)")
            .bind("$1", Json.of("{\"hello\": \"world\"}")).execute();

Consume JSON

connection.createStatement("SELECT my_json FROM my_table")
            .execute()
            .flatMap(it -> it.map((row, rowMetadata) -> row.get("my_json", Json.class)))
            .map(Json::asString);

Write JSON using casting

connection.createStatement("INSERT INTO my_table (my_json) VALUES($1::JSON)")
    .bind("$1", "{\"hello\": \"world\"}").execute();

Consume JSON as scalar type

connection.createStatement("SELECT my_json FROM my_table")
    .execute()
    .flatMap(it -> it.map((row, rowMetadata) -> row.get("my_json", String.class)));

The following types are supported for JSON exchange:

  • io.r2dbc.postgresql.codec.Json
  • ByteBuf (must be released after usage to avoid memory leaks)
  • ByteBuffer
  • byte[]
  • String
  • InputStream (must be closed after usage to avoid memory leaks)

CITEXT support

CITEXT is a built-in extension to support case-insensitive text columns. By default, the driver sends all string values as VARCHAR that cannot be used directly with CITEXT (without casting or converting values in your SQL).

If you cast input, then you can send parameters to the server without further customization of the driver:

CREATE TABLE test (ci CITEXT);
SELECT ci FROM test WHERE ci = $1::citext;

If you want to send individual String-values in a CITEXT-compatible way, then use Parameters.in(…):

connection.createStatement("SELECT ci FROM test WHERE ci = $1")
            .bind("$1", Parameters.in(PostgresqlObjectId.UNSPECIFIED, "Hello"))
            .execute();

If you do not have control over the created SQL or you want to send all String values in a CITEXT-compatible way, then you can customize the driver configuration by registering a StringCodec to send String values with the UNSPECIFIED OID to let Postgres infer the value type from the provided values:

Builder builder = PostgresqlConnectionConfiguration.builder();

builder.codecRegistrar((connection, allocator, registry) -> {
    registry.addFirst(new StringCodec(allocator, PostgresqlObjectId.UNSPECIFIED, PostgresqlObjectId.VARCHAR_ARRAY));
    return Mono.empty();
});

You can register also the CodecRegistrar as Extension so that it gets auto-detected during ConnectionFactory creation.

Cursors

The driver can consume cursors that were created by PL/pgSQL as refcursor. Cursors are represented as RefCursor objects. Cursors obtained from Result can be used to fetch the cursor directly. Since cursors are stateful, they must be closed once they are no longer in use.

connection.createStatement("SELECT show_cities_multiple()").execute()
    .flatMap(result -> result.map((row, rowMetadata) -> row.get(0, RefCursor.class)))
    .flatMap(cursor -> {
        Mono<PostgresResult> data = cursor.fetch()
            .flatMap(…)
            .then(rc.close());
        return data;
    });

Logical Decode

PostgreSQL allows replication streaming and decoding persistent changes to a database's tables into useful chunks of data. In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.

Consuming the replication stream is a four-step process:

  1. Obtain a replication connection via PostgresqlConnectionFactory.replication().
  2. Create a replication slot (physical/logical).
  3. Initiate replication using the replication slot.
  4. Once the replication stream is set up, you can consume and map the binary data using ReplicationStream.map(…).

On application shutdown, close() the ReplicationStream.

Note that a connection is busy once the replication is active and a connection can have at most one active replication stream.

Mono<PostgresqlReplicationConnection> replicationMono = connectionFactory.replication();

// later:
ReplicationSlotRequest request = ReplicationSlotRequest.logical()
                                        .slotName("my_slot")
                                        .outputPlugin("test_decoding")
                                        .temporary()
                                        .build();
Mono<ReplicationSlot> createSlot = replicationConnection.createSlot(request);

ReplicationRequest replicationRequest = ReplicationRequest.logical()
                                        .slotName("my_slot")
                                        .startPosition(LogSequenceNumber.valueOf(0))
                                        .slotOption("skip-empty-xacts", true)
                                        .slotOption("include-xids", false)
                                        .build();

Flux<T> replicationStream = replicationConnection.startReplication(replicationRequest).flatMapMany(it -> {
    return it.map(byteBuf -> {…})
        .doOnError(t -> it.close().subscribe());
});

Postgres Enum Types

Applications may make use of Postgres enumerated types by using EnumCodec to map custom types to Java enum types. EnumCodec requires the Postgres OID and the Java to map enum values to the Postgres protocol and to materialize Enum instances from Postgres results. You can configure a CodecRegistrar through EnumCodec.builder() for one or more enumeration type mappings. Make sure to use different Java enum types otherwise the driver is not able to distinguish between Postgres OIDs.

Example:

SQL:

CREATE TYPE my_enum AS ENUM ('FIRST', 'SECOND');

Java Model:

enum MyEnumType {
  FIRST, SECOND;
}

Codec Registration:

PostgresqlConnectionConfiguration.builder()
        .codecRegistrar(EnumCodec.builder().withEnum("my_enum",MyEnumType.class).build());

When available, the driver registers also an array variant of the codec.

Data Type Mapping

This reference table shows the type mapping between PostgreSQL and Java data types:

PostgreSQL Type Supported Data Type
bigint Long, Boolean, Byte, Short, Integer, BigDecimal, BigInteger
bit Not yet supported.
bit varying Not yet supported.
boolean or bool Boolean
box Box
bytea ByteBuffer, byte[], Blob
character String
character varying String
cidr Not yet supported.
circle Circle
date LocalDate
double precision Double, Float, Boolean, Byte, Short, Integer, Long, BigDecimal, BigInteger
enumerated types Client code Enum types through EnumCodec
geometry org.locationtech.jts.geom.Geometry
hstore Map
inet InetAddress
integer Integer, Boolean, Byte, Short, Long, BigDecimal, BigInteger
interval Interval
json Json, String. Reading: ByteBufbyte[], ByteBuffer, String, InputStream
jsonb Json, String. Reading: ByteBufbyte[], ByteBuffer, String, InputStream
line Line
lseg Lseg
macaddr Not yet supported.
macaddr8 Not yet supported.
money Not yet supported. Please don't use this type. It is a very poor implementation.
name String
numeric BigDecimal, Boolean, Byte, Short, Integer, Long, BigInteger
oid Integer, Boolean, Byte, Short, Long, BigDecimal, BigInteger
path Path
pg_lsn Not yet supported.
point Point
polygon Polygon
real Float, Double, Boolean, Byte, Short, Integer, Long, BigDecimal, BigInteger
smallint Short, Boolean, Byte, Integer, Long, BigDecimal, BigInteger
smallserial Integer, Boolean, Byte, Short, Long, BigDecimal, BigInteger
serial Long, Boolean, Byte, Short, Integer, BigDecimal, BigInteger
text String, Clob
time [without time zone] LocalTime
time [with time zone] OffsetTime
timestamp [without time zone] LocalDateTime, LocalTime, LocalDate, java.util.Date
timestamp [with time zone] OffsetDatetime, ZonedDateTime, Instant
tsquery Not yet supported.
tsvector Not yet supported.
txid_snapshot Not yet supported.
uuid UUID, String
xml Not yet supported.
vector Vector, float[]

Types in bold indicate the native (default) Java type.

Support for the following single-dimensional arrays (read and write):

PostgreSQL Type Supported Data Type
bytea[] ByteBuffer[], byte[][]
boolean[] or bool[] Boolean[]
box[] Box[]
character String[]
character varying String[]
circle[] Circle[]
date[] LocalDate[]
double precision[] Double[], Float[], Boolean[], Byte[], Short[], Integer[], Long[], BigDecimal[], BigInteger[]
enumerated type arrays Client code Enum[] types through EnumCodec
inet[] InetAddress[]
integer[] Integer[], Boolean[], Byte[], Short[], Long[], BigDecimal[], BigInteger[]
interval[] Interval[]
line[] Line[]
lseg[] Lseg[]
numeric[] BigDecimal[], Boolean[], Byte[], Short[], Integer[], Long[], BigInteger[]
path[] Path[]
point[] Point[]
polygon[] Polygon[]
real[] Float[], Double[], Boolean[], Byte[], Short[], Integer[], Long[], BigDecimal[], BigInteger[]
smallint[] Short[], Boolean[], Byte[], Integer[], Long[], BigDecimal[], BigInteger[]
smallserial[] Integer[], Boolean[], Byte[], Short[], Long[], BigDecimal[], BigInteger[]
serial[] Long[], Boolean[], Byte[], Short[], Integer[], BigDecimal[], BigInteger[]
text[] String[]
time[] [without time zone] LocalTime[]
time[] [with time zone] OffsetTime[]
timestamp[] [without time zone] LocalDateTime[], LocalTime[], LocalDate[], java.util.Date[]
timestamp[] [with time zone] OffsetDatetime[], ZonedDateTime[], Instant[]
uuid[] UUID[], String[]

Extension mechanism

This driver accepts the following extensions:

  • CodecRegistrar to contribute Codecs for PostgreSQL ObjectIDs.

Extensions can be registered programmatically using PostgresConnectionConfiguration or discovered using Java's ServiceLoader mechanism (from META-INF/services/io.r2dbc.postgresql.extension.Extension).

The driver ships with built-in dynamic codecs (e.g. hstore, PostGIS geometry) that are registered during the connection handshake depending on their availability while connecting. Note that Postgres extensions registered after a connection was established require a reconnect to initialize the codec.

Logging

If SL4J is on the classpath, it will be used. Otherwise, there are two possible fallbacks: Console or java.util.logging.Logger). By default, the Console fallback is used. To use the JDK loggers, set the reactor.logging.fallback System property to JDK.

Logging facilities:

  • Driver Logging (io.r2dbc.postgresql)
  • Query Logging (io.r2dbc.postgresql.QUERY on DEBUG level)
  • Parameters' values Logging (io.r2dbc.postgresql.PARAM on DEBUG level)
  • Transport Logging (io.r2dbc.postgresql.client)
    • DEBUG enables Message exchange logging
    • TRACE enables traffic logging

Logging that is associated with a connection reports the logical connection id (cid) which is a driver-local connection counter and the Postgres Process Id (pid) once the connection handshake finishes.

Getting Help

Having trouble with R2DBC? We'd love to help!

Reporting Issues

R2DBC uses GitHub as issue tracking system to record bugs and feature requests. If you want to raise an issue, please follow the recommendations below:

  • Before you log a bug, please search the issue tracker to see if someone has already reported the problem.
  • If the issue doesn't already exist, create a new issue.
  • Please provide as much information as possible with the issue report, we like to know the version of R2DBC PostgreSQL that you are using and JVM version.
  • If you need to paste code, or include a stack trace use Markdown ``` escapes before and after your text.
  • If possible try to create a test-case or project that replicates the issue. Attach a link to your code or a compressed file containing your code.

Building from Source

You don't need to build from source to use R2DBC PostgreSQL (binaries in Maven Central), but if you want to try out the latest and greatest, R2DBC PostgreSQL can be easily built with the maven wrapper. You also need JDK 1.8 and Docker to run integration tests.

 $ ./mvnw clean install

If you want to build with the regular mvn command, you will need Maven v3.5.0 or above.

Also see CONTRIBUTING.adoc if you wish to submit pull requests.

Running JMH Benchmarks

Running the JMH benchmarks builds and runs the benchmarks without running tests.

 $ ./mvnw clean install -Pjmh

License

This project is released under version 2.0 of the Apache License.

r2dbc-postgresql's People

Contributors

anshlykov avatar avinash-anand avatar bzikarsky avatar chang-chao avatar cty123 avatar davecramer avatar dependabot[bot] avatar fkomauli avatar fred84 avatar govi20 avatar gregturn avatar h1alexbel avatar igor-suhorukov avatar isabek avatar jbellassai avatar joselion avatar killaitis avatar making avatar mp911de avatar nbenjamin avatar nebhale avatar olegdokuka avatar rdegnan avatar sdeleuze avatar squiry avatar steven-sheehy avatar strogiyotec avatar sullis avatar toverdijk avatar uaihebert avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

r2dbc-postgresql's Issues

Error message for authentication failure is not very useful

Hi, this is a first-timers-only issue. This means we've worked to make it more legible to folks who either haven't contributed to our codebase before or even folks who haven't contributed to open source before.

If that's you, we're interested in helping you take the first step and can answer questions and help you out as you do. Note that we're especially interested in contributions from people from groups underrepresented in free and open source software!

If you have contributed before, consider leaving this one for someone new, and looking through our general ideal-for-contribution issues. Thanks!

Background

PostgreSQL returns a single ErrorResponse message. When an authentication error is returned it's not clear that this common error has occurred.

Problem

Right now, it's not obvious when an authentication error, which is very common, has occurred.

Solution

An R2dbcAuthenticationException should be created as a subclass of R2dbcException to indicate that an authentication error has occurred. This type should then be returned when the ReactotNettyClient encounters a 28P01 error code message.

Steps to fix

  • Claim this issue with a comment below and ask any clarifying questions you need.
  • Try to fix the issue following the steps above.
  • Commit your changes and start a pull request.

Deliverables

Document supported data types

Hi, this is a first-timers-only issue. This means we've worked to make it more legible to folks who either haven't contributed to our codebase before or even folks who haven't contributed to open source before.

If that's you, we're interested in helping you take the first step and can answer questions and help you out as you do. Note that we're especially interested in contributions from people from groups underrepresented in free and open source software!

If you have contributed before, consider leaving this one for someone new, and looking through our general ideal-for-contribution issues. Thanks!

Background

We should document which PostgreSQL data types are currently supported by this driver to document features/limitations. Any non-documented data type is not supported.

Problem

Right now, it's not obvious which data types can be used and to which Java types PostgreSQL types map to.

Solution

Supported data types can be found by checking for Codec implementations in the io.r2dbc.postgresql.codec package. Supported data types can be a table (matrix) between the PostgreSQL data type (e.g. VARCHAR) and to which Java type it maps to (e.g. java.lang.String, based on StringCodec.

The table with the type mapping should be added to our README.md.

Steps to fix

  • Claim this issue with a comment below and ask any clarifying questions you need.
  • Try to fix the issue following the steps above.
  • Commit your changes and start a pull request.

Deliverables

  • Updated README.md containing a section which PostgreSQL data types are currently supported by this driver.

Support of rw/ro database servers

An interesting feature would be to be able to use a master server (for read/writes) and one-to-n slave servers (for reads only).

Is there any plans for this ?

Execute multiple statements in a single sql with bind params failed

Using 1.0.0.M7 execute multiple statements in a single sql with bind params failed :

@Test
void insertRowsByMultipleInsertIntoWithBind() {
  int[] ids = new int[]{nextId(), nextId(), nextId()};
  String sql = "insert into T(id) values($1);insert into T(id) values($2), ($3);";
  StepVerifier.create(
    connection(cache)
      .flatMapMany(c -> c.createStatement(sql)
        .bind("$1", ids[0])
        .bind("$2", ids[1])
        .bind("$3", ids[3])
        .execute())
      .flatMap(Result::getRowsUpdated)
  ).expectNext(1).expectNext(2).verifyComplete();
}

r2dbc-postgres error :

failed (expected: onNext(1); actual: onError(java.lang.IllegalArgumentException: Statement 'insert into T(id) values($1);
insert into T(id) values($2), ($3);' cannot be created. This is often due to the presence of both multiple statements and parameters at the same time.))

r2dbc-h2 error :

failed (expected: onNext(1); actual: onError(java.lang.ArrayIndexOutOfBoundsException: 3))

But execute success if change the bind params to raw values. Such as 'insert into T(id) values(1);insert into T(id) values(2), (3);'.

Method run forever if call Result map with bind-param insert into sql

With 1.0.0.M7 or M6, bellow method will run forever :

@Test
@Disabled("postgres will run forever")
void insertOneAndDoResultMapWithBind() {
  StepVerifier.create(
    connection(cache)
      .flatMapMany(c -> c.createStatement("insert into T(id) values($1)").bind("$1", 9).execute())
      .flatMap(result -> result.map((row, rowMetadata) -> 1))
  ).verifyComplete();
}

But if change to 'c.createStatement("insert into T(id) values(9)").execute()', test ok.

Add LogicalDecoding

PostgreSQL has a feature which enables streaming changes to the client
Add functionality for this

Conflict between io.projectreactor.netty and io.projectreactor.ipc

I am building a small service for which I use spring webflux boot on netty together with r2dbc-postgresql. I am able to start it just alright and when I perform request I get the response back.

The problem comes up when in request processing an interaction with DB is involved via r2dbc-postgresql. I keep getting initialization errors for class ReactorNetty.java from io.projectreactor.netty after the response was sent successfully:

Caused by: java.lang.IllegalArgumentException: 'PERSISTENT_CHANNEL' is already in use
	at io.netty.util.ConstantPool.createOrThrow(ConstantPool.java:113) ~[netty-common-4.1.29.Final.jar:4.1.29.Final]
	at io.netty.util.ConstantPool.newInstance(ConstantPool.java:95) ~[netty-common-4.1.29.Final.jar:4.1.29.Final]
	at io.netty.util.AttributeKey.newInstance(AttributeKey.java:55) ~[netty-common-4.1.29.Final.jar:4.1.29.Final]
	at reactor.netty.ReactorNetty.<clinit>(ReactorNetty.java:614) ~[reactor-netty-0.8.0.RELEASE.jar:0.8.0.RELEASE]

The thing is that before ReactorNetty from io.projectreactor.netty (r2dbc-postgresql's dep) tries to be initialized, another ReactorNetty.java from project io.projectreactor.ipc (spring webflux boot's dep) already initialized itself. As both classes, while initializing, put the same String values to ConstantPool (mentioned in the stacktrace above) where one value can't be put there twice, the initialization fails for the class which came later.
Btw. "spring boot's ReactorNetty" inits itself before response is sent to the client and "r2dbc's ReactorNetty" after the response is sent.

Now as said I get the value out of DB alright and the response as well, it's just that I have a log full of errors and I guess moreover that this is not just cosmetic issue, although I don't see any side effects.

Deps

    compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
    compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
    compile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
    compile "org.springframework.boot:spring-boot-starter-webflux:2.0.6.RELEASE"
    compile "io.r2dbc:r2dbc-postgresql:1.0.0.BUILD-SNAPSHOT"
    compile "org.springframework.data:spring-data-jdbc:1.0.0.r2dbc-SNAPSHOT"

DB connection

@Bean
open fun factory(client: DatabaseClient): R2dbcRepositoryFactory {
    val context = RelationalMappingContext()
    context.afterPropertiesSet()
    return R2dbcRepositoryFactory(client, context)
}

@Bean
open fun databaseClient(factory: ConnectionFactory): DatabaseClient {
    return DatabaseClient.builder().connectionFactory(factory).build()
}

@Bean
open fun connectionFactory(): PostgresqlConnectionFactory {
    val config = PostgresqlConnectionConfiguration.builder()
            .host("localhost")
            .port(5432)
            .database("test")
            .username("postgres")
            .password("example")
            .build()
    return PostgresqlConnectionFactory(config)
}

Repo

import org.springframework.data.jdbc.repository.query.Query
import org.springframework.data.repository.reactive.ReactiveCrudRepository
import reactor.core.publisher.Flux

interface UserRepository: ReactiveCrudRepository<Test, Int> {
    @Query("select * from test")
    fun test(): Flux<Test>
}

StringCodec#doCanDecode should handle BPCHAR [character(n)]

Columns with type character(n) are mapped to PostgresqlObjectId#BPCHAR but aren't considered as being decodable by StringCodec#doCanDecode.
Actually I am getting an IllegalArgumentException trying to decode (Row#get(COLNAME, String.class) such a column; thrown in DefaultCodecs#decode.

Reduce usage of Optional and Stream in hot code paths

We should replace Optional and Stream in hot code paths to avoid GC pressure and CPU overhead due to additional method invocations and intermediate object creation.

Using the Stream API makes the code more functional/readable than using loops and if statements, yet the overhead caused by using Optional and Stream can be significant.

Custom Operator

Having some experience doing this now, the client portion of the implementation (i.e. the portion that transforms Reactive Streams into wire-protocol) should be implemented as a custom operator. This would allow the use of onSubscribe() to establish the connection, cancel() to terminate it, and request() to call for more data from a portal. This is much easier to troubleshoot and more intuitive to maintain than the current nesting dolls of reactive flows.

memory leak in BackendMessageDecoder

Hi- doing some testing and it appears there is a memory leak in the BackendMessageDecoder. It doesn't seem to show up unless you're under load, and run for a while. I turned on the Netty leak detector and it prints warnings pointing to BackendMessageDecoder, and then I get DirectMemory OOM exceptions. You might want to use the CompositeByteBuf API directly.

PostgresqlConnectionFactory.create() is blocking

I'm using 1.0.0.BUILD-SNAPSHOT with reactor-netty 0.8.0.M2.

The following exception occurred when creating a Connection on the thread of Spring WebFlux

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:111)
	at reactor.core.publisher.Mono.block(Mono.java:1490)
	at reactor.netty.tcp.TcpClient.connectNow(TcpClient.java:219)
	at reactor.netty.tcp.TcpClient.connectNow(TcpClient.java:205)
	at io.r2dbc.postgresql.client.ReactorNettyClient.<init>(ReactorNettyClient.java:124)
	at io.r2dbc.postgresql.PostgresqlConnectionFactory.lambda$new$0(PostgresqlConnectionFactory.java:50)
	at io.r2dbc.postgresql.PostgresqlConnectionFactory.create(PostgresqlConnectionFactory.java:61)

Creating a ReactorNettyClient seems blocking and this is created per Connection.

I think Connection will be created per request, so this behavior is not expected.

Getting this error - FATAL: sorry, too many clients already

When a running several thousands queries in a row - single selects - I get this error:
2018-10-15 17:27:43.011 PDT [33855] FATAL: sorry, too many clients already

Here's the java client code:

  public Mono<Department> getDepartmentById(int id) {
    String sql = "SELECT * FROM department WHERE department_id = $1";

    return r2dbc
        .withHandle(
            handle ->
                handle
                    .select(sql, id)
                    .mapRow(
                        row -> {
                          return Department.newBuilder()
                              .setId(row.get("department_id", Integer.class))
                              .setName(row.get("department_name", String.class))
                              .build();
                        }))
        .single();
  }

Flux.range(1, 10_000)
        .flatMap(
            i -> {
              int id = ThreadLocalRandom.current().nextInt(1, 5);
              return employeeRepository.getDepartmentById(id);
            })
        .doOnNext(department -> System.out.println(department.toString()))
        .doOnError(Throwable::printStackTrace)
        .blockLast();

This is what the table looks like:

CREATE TABLE department (
  department_id  INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
  department_name VARCHAR (100) NOT NULL
);

INSERT INTO department (department_name) VALUES ('Sales');
INSERT INTO department (department_name) VALUES ('Marketing');
INSERT INTO department (department_name) VALUES ('Human Resources');
INSERT INTO department (department_name) VALUES ('Manufacturing');
INSERT INTO department (department_name) VALUES ('Accounting');

Handle multiple authentication methods is not correct.

The way the protocol works is that the backend sends a message requesting the authentication it desires
After the auth request, the next 4 bytes are the authentication method. I think what needs to be done is to have multiple handlers added to the startup message and then react to the correct authentication method ?

Support connection pooling

To boost performances, it would be interesting to implement connection pooling.
Is there any plans for this ?

Binding parameters to a statement with newlines (\n) does not work

I would like to include newline characters (\n) in query string to improve readability, but this causes an UnsupportedOperationException "Binding parameters is not supported for the statement" to be thrown if parameters are bound to the statement.

E.g. query

SELECT f.name
FROM foo f
WHERE f.id = $1

is not matched by ExtendedQueryPostgresqlStatement::supports-method ( https://github.com/r2dbc/r2dbc-postgresql/blob/master/src/main/java/io/r2dbc/postgresql/ExtendedQueryPostgresqlStatement.java#L124 ) an thus SimpleQueryPostgresqlStatement get selected as implementation.
Maybe use .find() should be used in the condition instead of .matches().
Otherwise, maybe the pattern would need to include or to be compiled with MULTILINE-flag ( https://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#MULTILINE ).

Example from README is not working

Usage of:

    ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
...
    .build());

Mono<Connection> connection = connectionFactory.create();

gives this compiler error: incompatible types: org.reactivestreams.Publisher<capture#1 of ? extends io.r2dbc.spi.Connection> cannot be converted to reactor.core.publisher.Mono<io.r2dbc.spi.Connection>

I was able to fix the error using PostgresqlConnectionFactory and Mono<PostgresqlConnection>. But it still doesn't make the example usable. Here is the error:

cannot find symbol
  symbol:   method createStatement(java.lang.String)
  location: variable connection of type reactor.core.publisher.Mono<io.r2dbc.postgresql.PostgresqlConnection>

Execute multi-line select sql failed

Using 1.0.0.M6 execute bellow multi-line select sql failed:

select id from T
  where id in ($1, $2)
  order by id asc

r2dbc-postgres error:

Suppressed: java.lang.UnsupportedOperationException: Binding parameters is not supported for the statement ...

r2dbc-h2 error:

Suppressed: org.h2.message.DbException: Method is not allowed for a query. Use execute or executeQuery instead of executeUpdate [90001-197]

But execute success if change to a single line sql such as 'select id from T where id in ($1, $2) order by id asc'.

Row.get(…, Object.class) fails with java.lang.IllegalArgumentException: Cannot decode value of type java.lang.Object

Calling Row.get(…) with Object.class fails with IllegalArgumentException

Caused by: java.lang.IllegalArgumentException: Cannot decode value of type java.lang.Object
		at io.r2dbc.postgresql.codec.DefaultCodecs.decode(DefaultCodecs.java:85)
		at io.r2dbc.postgresql.PostgresqlRow.get(PostgresqlRow.java:82)

Code to reproduce:

SQL

CREATE TABLE IF NOT EXISTS legoset (
    id          integer CONSTRAINT id PRIMARY KEY,
    name        varchar(255) NOT NULL,
    manual      integer NULL
);

INSERT INTO legoset (id, name, manual) VALUES(42055, 'SCHAUFELRADBAGGER', 12);

Java

Mono.from(connectionFactory.create()).map(c -> c.createStatement("SELECT manual FROM legoset"))
		.flatMapMany(Statement::execute)
		.flatMap(it -> it.map((r, md) -> r.get("manual", Object.class)))
		.as(StepVerifier::create)
		.expectNextCount(1)
		.verifyComplete();

Remote host connection closed problem

I was trying to play with r2dbc but can not get it to work. I think it is a problem with communication between the Driver and Database. I could not debug the exact problem because I do not understand reactor code (yet). But here is DEBUG log of the problem. What I am sure is that connection information is OK because I see in logs that SQL was sent and response was delivered.

19:58:54.642 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
19:58:54.759 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
19:58:54.784 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
19:58:54.786 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
19:58:54.788 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
19:58:54.789 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
19:58:54.790 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
19:58:54.790 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
19:58:54.790 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
19:58:54.791 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
19:58:54.791 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
19:58:54.791 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
19:58:54.792 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
19:58:54.792 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\Rene\AppData\Local\Temp (java.io.tmpdir)
19:58:54.792 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
19:58:54.794 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
19:58:54.794 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3810525184 bytes
19:58:54.794 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
19:58:54.795 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
19:58:54.810 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
19:58:54.810 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
19:58:54.814 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
19:58:54.819 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
19:58:54.819 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
19:58:54.829 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
19:58:54.829 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
19:58:54.996 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
19:58:54.996 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
19:58:55.016 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
19:58:55.037 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
19:58:55.037 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
19:58:55.046 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
19:58:55.287 [main] DEBUG reactor.netty.resources.DefaultLoopEpoll - Default Epoll support : false
19:58:55.288 [main] DEBUG reactor.netty.resources.DefaultLoopKQueue - Default KQueue support : false
19:58:55.317 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 104488 (auto-detected)
19:58:55.495 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:15:5d:ff:fe:00:0c:06 (auto-detected)
19:58:55.516 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
19:58:55.516 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
19:58:55.516 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
19:58:55.542 [reactor-tcp-nio-1] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x88238320] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
19:58:55.552 [reactor-tcp-nio-1] DEBUG reactor.netty.resources.NewConnectionProvider - [id: 0x88238320, L:/127.0.0.1:49575 - R:localhost/127.0.0.1:5432] Connected new channel
19:58:55.553 [reactor-tcp-nio-1] DEBUG reactor.netty.resources.NewConnectionProvider - [id: 0x88238320, L:/127.0.0.1:49575 - R:localhost/127.0.0.1:5432] onStateChange([connected], SimpleConnection{channel=[id: 0x88238320, L:/127.0.0.1:49575 - R:localhost/127.0.0.1:5432]})
19:58:55.555 [reactor-tcp-nio-1] DEBUG reactor.netty.resources.NewConnectionProvider - [id: 0x88238320, L:/127.0.0.1:49575 - R:localhost/127.0.0.1:5432] onStateChange([configured], ChannelOperations{SimpleConnection{channel=[id: 0x88238320, L:/127.0.0.1:49575 - R:localhost/127.0.0.1:5432]}})
19:58:55.598 [reactor-tcp-nio-1] DEBUG reactor.netty.channel.FluxReceive - [id: 0x88238320, L:/127.0.0.1:49575 - R:localhost/127.0.0.1:5432] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
19:58:55.612 [reactor-tcp-nio-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
19:58:55.613 [reactor-tcp-nio-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@5f6ba8cd
19:58:55.618 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Request:  StartupMessage{applicationName='postgresql-r2dbc', database='postgres', username='postgres'}
19:58:55.623 [reactor-tcp-nio-1] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0x88238320, L:/127.0.0.1:49575 - R:localhost/127.0.0.1:5432] Writing object 
19:58:55.627 [reactor-tcp-nio-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
19:58:55.627 [reactor-tcp-nio-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
19:58:55.627 [reactor-tcp-nio-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
19:58:55.627 [reactor-tcp-nio-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
19:58:55.715 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: AuthenticationMD5Password{salt=java.nio.DirectByteBuffer[pos=0 lim=4 cap=4]}
19:58:55.719 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Request:  PasswordMessage{password='md5247c47bead8536029078c38ee927e92a'}
19:58:55.720 [reactor-tcp-nio-1] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0x88238320, L:/127.0.0.1:49575 - R:localhost/127.0.0.1:5432] Writing object 
19:58:55.722 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: AuthenticationOk{}
19:58:55.723 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='application_name', value='postgresql-r2dbc'}
19:58:55.723 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='client_encoding', value='UTF8'}
19:58:55.723 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='DateStyle', value='ISO, DMY'}
19:58:55.723 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='integer_datetimes', value='on'}
19:58:55.723 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='IntervalStyle', value='postgres'}
19:58:55.724 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='is_superuser', value='on'}
19:58:55.724 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='server_encoding', value='UTF8'}
19:58:55.724 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='server_version', value='10.5'}
19:58:55.724 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='session_authorization', value='postgres'}
19:58:55.724 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='standard_conforming_strings', value='on'}
19:58:55.724 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ParameterStatus{name='TimeZone', value='Europe/Belgrade'}
19:58:55.724 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: BackendKeyData{processId=33572, secretKey=-933421311}
19:58:55.725 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ReadyForQuery{transactionStatus=IDLE}
19:58:55.754 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Request:  Query{query='select 123 as datname'}
19:58:55.754 [reactor-tcp-nio-1] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0x88238320, L:/127.0.0.1:49575 - R:localhost/127.0.0.1:5432] Writing object 
19:58:55.762 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: RowDescription{fields=[Field{column=0, dataType=23, dataTypeModifier=-1, dataTypeSize=4, format=TEXT, name='datname', table=0}]}
19:58:55.785 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: DataRow{columns=[UnpooledSlicedByteBuf(ridx: 0, widx: 3, cap: 3/3, unwrapped: PooledUnsafeDirectByteBuf(ridx: 47, widx: 67, cap: 512))]}
19:58:55.787 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: CommandComplete{command=SELECT, rowId=null, rows=1}
19:58:55.793 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Response: ReadyForQuery{transactionStatus=IDLE}
19:58:55.796 [reactor-tcp-nio-1] DEBUG io.r2dbc.postgresql.client.ReactorNettyClient - Request:  Terminate{}
19:58:55.797 [reactor-tcp-nio-1] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0x88238320, L:/127.0.0.1:49575 - R:localhost/127.0.0.1:5432] Writing object 
Exception in thread "main" 19:58:55.803 [reactor-tcp-nio-1] DEBUG reactor.netty.resources.NewConnectionProvider - [id: 0x88238320, L:/127.0.0.1:49575 ! R:localhost/127.0.0.1:5432] onStateChange([disconnecting], ChannelOperations{SimpleConnection{channel=[id: 0x88238320, L:/127.0.0.1:49575 ! R:localhost/127.0.0.1:5432]}})
reactor.core.Exceptions$ReactiveException: java.io.IOException: An existing connection was forcibly closed by the remote host
	at reactor.core.Exceptions.propagate(Exceptions.java:326)
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:91)
	at reactor.core.publisher.Flux.blockLast(Flux.java:2360)
	at primrose.DemoApplication.main(DemoApplication.java:26)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		... 2 more
Caused by: java.io.IOException: An existing connection was forcibly closed by the remote host
	at sun.nio.ch.SocketDispatcher.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:628)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:563)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
	at java.lang.Thread.run(Thread.java:748)

StringCodec doCanDecode test wrong

StringCodec at line 51 tests if return type from database is:
TEXT == format && VARCHAR == type
I think it should be:
TEXT == format || VARCHAR == type

Row.get(…) fails for array-typed values

Calling Row.get(…) or Row.get(…, Object.class) fails for array-typed columns with:

java.lang.IllegalArgumentException: Cannot decode value of type java.lang.Object

Expectation

Row.get(…) should return the array value with the most appropriate type. For INT[] that should be Integer[].class.

Steps to reproduce

SQL:

CREATE TABLE with_arrays (my_array INT[]);
INSERT INTO with_arrays (my_array) VALUES('{1,2,3}')

Code:

Mono.from(connectionFactory.create())
    .flatMapMany(it -> {
        return it.createStatement("SELECT my_array FROM with_arrays").execute();
    }).flatMap(it -> it.map((row, rowMetadata) -> row.get("my_array")))
    .doOnNext(System.out::println)
    .as(StepVerifier::create)
    .expectNextCount(1)
    .verifyComplete();

Mapping result error is not propagating upstream

After playing a bit with the driver I notice that when an error happens while mapping a row the error wasn't being pushed upstream - so I registered a doOnError and propagate it upwards.

However the upstream hook doOnError ("2. doOnError") never gets called . Am I doing something wrong ?

r2dbc.withHandle(handle ->
     handle
           .select(SELECT_INNER_JOIN)
           .mapResult(result -> 
                 result.map((rw, rm) -> rw.get("product_id", Integer.class))) // mapping
            .doOnError(Mono::error) // push error upstream
     )
     .doOnError(throwable -> LOG.error("2. doOnError", throwable))
     .subscribe();

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.