Code Monkey home page Code Monkey logo

cqerl's Introduction

CQErl

CircleCI

Native Erlang client for CQL3 over Cassandra's latest binary protocol v4.

Usage · Connecting · Clusters · Performing queries · Query options · Batched queries · Reusable queries · Data types

Installation · Compatibility · Tests · License

At a glance

CQErl offers a simple Erlang interface to Cassandra using the latest CQL version. The main features include:

  • Automatic connection pooling, with hash-based allocation (a la dispcount)
  • Single or multi-cluster support
  • Batched queries
  • Variable bindings in CQL queries (named or positional)
  • Automatic query reuse when including variable bindings
  • Collection types support
  • User-defined type support
  • Tunable consistency level
  • Automatic pagination support
  • Synchronous or asynchronous queries
  • Automatic compression (using lz4 or snappy if available)
  • SSL support
  • Pluggable authentication (as long as it's SASL-based)

CQErl was designed to be as simple as possible on your side. You just provide the configuration you want as environment variables, and ask to get a client everytime you need to perform a transient piece of work (e.g. handle a web request). You do not need to (and in fact should not) keep a client in state for a long time. Under the hood, CQErl maintains a pool of persistent connections with Cassandra and this pattern is the best way to ensure proper load balancing of requests across the pool.

Usage

Connecting

If you installed cassandra and didn't change any configuration related to authentication or SSL, you should be able to connect like this

{ok, Client} = cqerl:get_client({}).

You do not need to close the connection after you've finished using it.

  1. The first argument to cqerl:get_client/2,1 is the node to which you wish to connect as {Ip, Port}. If empty, it defaults to {"127.0.0.1", 9042}, and Ip can be given as a string, or as a tuple of components, either IPv4 or IPv6.

    • If the default port is used, you can provide just the IP address as the first argument, either as a tuple, list or binary.
    • If both the default port and localhost are used, you can just provide an empty tuple as the first argument.
  2. The second possible argument (when using cqerl:get_client/2) is a list of options, that include:

    • keyspace which determines in which keyspace all subsequent requests operate, on that connection.
    • auth (mentionned below)
    • ssl (which is false by default, but can be set to a list of SSL options) and keyspace (string or binary).
    • protocol_version to connect to older Cassandra instances.

    If you've set simple username/password authentication scheme on Cassandra, you can provide those to CQErl

    {ok, Client} = cqerl:get_client({}, [{auth, {cqerl_auth_plain_handler, [{"test", "aaa"}]}}]).

    Since Cassandra implements pluggable authentication mechanisms, CQErl also allows you to provide custom authentication modules (here cqerl_auth_plain_handler). The options you pass along with it are given to the module's auth_init/3 as its first argument.

  3. You can leverage one or more clusters of cassandra nodes by setting up clusters. When set up, you can use

    1. cqerl:get_client() if you have just a single main cluster
    2. cqerl:get_client(ClusterKey) if you want to get a client from a specific, identified cluster

Using environment variables

All the options given above can be provided as environment variables, in which case they are used as default (and overridable) values to any cqerl:get_client calls. You can also provide a cassandra_nodes variable containing a list of the tuples used as the first argument to cqerl:get_client (see clusters for more explanations). So for example, in your app.config or sys.config file, you could have the following content:

[
  {cqerl, [
            {cassandra_nodes, [ { "127.0.0.1", 9042 } ]},
            {ssl, [ {cacertfile, "cassandra.pem"} ]},
            {auth, {cqerl_auth_plain_handler, [ {"test", "aaa"} ]}}
          ]},
]

Doing so will fire up connection pools as soon as the CQErl application is started. So when later on you call cqerl:get_client, chances are you will hit a preallocated connection (unless they're so busy that CQErl needs to fire up new ones). In fact, if you provide the cassandra_nodes environment variable, you can call cqerl:get_client/0, which chooses an available client at random.

Clusters

With CQErl clusters, you can configure either a single set of cassandra nodes from which you can draw a client at any time, or multiple sets that serve different purposes.

Single cluster

You can prepare a single cluster setup using this structure in your sys.config file:

[
  {cqerl, [ {cassandra_nodes, [ 
                % You can use any of the forms below to specify a cassandra node
                { "127.0.0.1", 9042 },
                { {127, 0, 0, 2}, 9042 },
                "127.0.0.3"
            ]},
            { keyspace, dev_keyspace }
          ]},
]

or, equivalently, there's an API you can use to add nodes to the single main cluster:

cqerl_cluster:add_nodes([
    { "127.0.0.1", 9042},
    { {127, 0, 0, 2}, 9042 },
    "127.0.0.3"
]).

or, with connection options:

cqerl_cluster:add_nodes([
    { "127.0.0.1", 9042},
    { {127, 0, 0, 2}, 9042 },
    "127.0.0.3"
], [
    { keyspace, dev_keyspace }
]).

When your main cluster is configured, you can just use cqerl:get_client/0 to get a client random from the cluster.

Multiple clusters

You can prepare multiple clusters using this structure in your sys.config file:

[
  {cqerl, [ {cassandra_clusters, [
                { config, {
                    [ "127.0.0.1", "127.0.0.3" ], 
                    [ { keyspace, config } ] 
                }},
                { operations, {
                    [ "127.0.0.1:9042", {"127.0.0.1", 9042} ], 
                    [ { keyspace, operations } ] 
                }}
            ]},
          ]},
]

or, equivalently, there's an API you can use to add nodes to particular clusters:

cqerl_cluster:add_nodes(config, [
    { "127.0.0.1", 9042},
    "127.0.0.3"
], [
    { keyspace, config }
]).
cqerl_cluster:add_nodes(operations, [
    { "127.0.0.1", 9042},
    "127.0.0.3"
], [
    { keyspace, operations }
]).
Options

There are two application environment variables that may be set to change query behaviour:

  • {maps, true} will cause query result rows to be returned as maps instead of proplists
  • {text_uuids, true} will cause timeuuid and uuid fields to be returned as binary strings in canonical form (eg <<"5620c844-e98d-11e5-b97b-08002719e96e">>) rather than pure binary.
Performing queries

Performing a query can be as simple as this:

{ok, Result} = cqerl:run_query(Client, "SELECT * FROM users;").

% Equivalent to
{ok, Result} = cqerl:run_query(Client, <<"SELECT * FROM users;">>).

% Also equivalent to
{ok, Result} = cqerl:run_query(Client, #cql_query{statement = <<"SELECT * FROM users;">>}).

It can also be performed asynchronously using

Tag = cqerl:send_query(Client, "SELECT * FROM users;"),
receive
    {result, Tag, Result} ->
        ok
end.

In situations where you do not need to wait for the response at all, it's perfectly fine to produce this sort of pattern:

{ok, Client} = cqerl:get_client(),
cqerl:send_query(Client, #cql_query{statement="UPDATE secrets SET value = null WHERE id = ?;",
                                    values=[{id, <<"42">>}]}).

That is, you can grab a client only the send a query, then you can get rid of it. CQErl will still perform it, the difference being that no response will be sent back to you.

Here's a rundown of the possible return values

  • SELECT queries will yield result of type #cql_result{} (more details below).
  • Queries that change the database schema will yield result of type #cql_schema_changed{type, keyspace, table}
  • Other queries will yield void if everything worked correctly.
  • In any case, errors returned by cassandra in response to a query will be the return value ({error, Reason} in the synchronous case, and {error, Tag, Reason} in the asynchronous case).
#cql_result{}

The return value of SELECT queries will be a #cql_result{} record, which can be used to obtain rows as proplists and fetch more result if available

{ok, _SchemaChange} = cqerl:run_query(Client, "CREATE TABLE users(id uuid, name varchar, password varchar);"),
{ok, void} = cqerl:run_query(Client, #cql_query{
    statement = "INSERT INTO users(id, name, password) VALUES(?, ?, ?);",
    values = [
        {id, new},
        {name, "matt"},
        {password, "qwerty"}
    ]
}),
{ok, Result} = cqerl:run_query(Client, "SELECT * FROM users;").

Row = cqerl:head(Result),
Tail = cqerl:tail(Result),
{Row, Tail} = cqerl:next(Result),
1 = cqerl:size(Result),
0 = cqerl:size(Tail),
empty_dataset = cqerl:next(Tail),
[Row] = cqerl:all_rows(Result),

<<"matt">> = proplists:get_value(name, Row),
<<"qwerty">> = proplists:get_value(password, Row).
Pagination

#cql_result{} can also be used to fetch the next page of result, if applicable, synchronously or asynchronously. This uses the automatic paging mechanism described here.

case cqerl:has_more_pages(Result) of
    true -> {ok, Result2} = cqerl:fetch_more(Result);
    false -> ok
end,

Tag2 = cqerl:fetch_more_async(Result),
receive
    {result, Tag2, Result2} -> ok
end.
#cql_schema_changed{}

#cql_schema_changed{} is returned from queries that change the database schema somehow (e.g. ALTER, DROP, CREATE, and so on). It includes:

  1. The type of change, either created, updated or dropped
  2. The name of the keyspace where the change happened, as a binary
  3. If applicable, the name of table on which the change was applied, as a binary
Providing options along queries

When performing queries, you can provide more information than just the query statement using the #cql_query{} record, which includes the following fields:

  1. The query statement, as a string or binary

  2. values for binding variables from the query statement (see next section).

  3. You can tell CQErl to consider a query reusable or not (see below for what that means). By default, it will detect binding variables and consider it reusable if it contains (named or not) any. Queries containing named binding variables will be considered reusable no matter what you set reusable to. If you explicitely set reusable to false on a query having positional variable bindings (?), you would provide values with in {Type, Value} pairs instead of {Key, Value}.

  4. You can specify how many rows you want in every result page using the page_size (integer) field. The devs at Cassandra recommend a value of 100 (which is the default).

  5. You can also specify what consistency you want the query to be executed under. Possible values include:

    • any
    • one
    • two
    • three
    • quorum
    • all
    • local_quorum
    • each_quorum
    • local_one
  6. In case you want to perform a lightweight transaction using INSERT or UPDATE, you can also specify the serial_consistency that will be use when performing it. Possible values are:

    • serial
    • local_serial
Variable bindings

In the #cql_query{} record, you can provide values as a proplists or map, where the keys are all atoms and match the column names or binding variable names in the statement, in lowercase.

Example:

% Deriving the value key from the column name
#cql_query{statement="SELECT * FROM table1 WHERE id = ?", values=[{id, SomeId}]},

% Explicitly providing a binding variable name
#cql_query{statement="SELECT * FROM table1 WHERE id = :id_value", values=[{id_value, SomeId}]},

Special cases include:

  • providing TTL and TIMESTAMP option in statements, in which case the proplist key would be [ttl] and [timestamp] respectively. Note that, while values for a column of type timestamp are provided in milliseconds, a value for the TIMESTAMP option is expected in microseconds.

  • UPDATE keyspace SET set = set + ? WHERE id = 1;. The name for this variable binding is set, the name of the column, and it's expected to be an erlang list of values.

  • UPDATE keyspace SET list = list + ? WHERE id = 1;. The name for this variable binding is list, the name of the column, and it's expected to be an erlang list of values.

  • UPDATE keyspace SET map[?] = 1 WHERE id = 1;. The name for this variable binding is key(map), where map is the name of the column.

  • UPDATE keyspace SET map['key'] = ? WHERE id = 1;. The name for this variable binding is value(map), where map is the name of the column.

  • UPDATE keyspace SET list[?] = 1 WHERE id = 1;. The name for this variable binding is idx(list), where list is the name of the column.

  • SELECT * FROM keyspace LIMIT ?. The name for the LIMIT variable is [limit].

    Also, when providing the value for a uuid-type column, you can give the value new, strong or weak, in which case CQErl will generate a random UUID (v4), with either a strong or weak number random generator.

    Finally, when providing the value for a timeuuid or timestamp column, you can give the value now, in which case CQErl will generate a normal timestamp, or a UUID (v1) matching the current date and time.

Batched queries

To perform batched queries (which can include any non-SELECT DML statements), simply put one or more #cql_query{} records in a #cql_query_batch{} record, and run it in place of a normal #cql_query{}. #cql_query_batch{} include the following fields:

  1. The consistency level to apply when executing the batch of queries.
  2. The mode of the batch, which can be logged, unlogged or counter. Running a batch in unlogged mode removes the performance penalty of enforcing atomicity. The counter mode should be used to perform batched mutation of counter values.
  3. Finally, you must specify the list of queries.
InsertQ = #cql_query{statement = "INSERT INTO users(id, name, password) VALUES(?, ?, ?);"},
{ok, void} = cqerl:run_query(Client, #cql_query_batch{
  mode=unlogged,
  queries=[
    InsertQ#cql_query{values = [{id, new},{name, "sean"},{password, "12312"}]},
    InsertQ#cql_query{values = [{id, new},{name, "jenna"},{password, "11111"}]},
    InsertQ#cql_query{values = [{id, new},{name, "kate"},{password, "foobar"}]}
  ]
}).
Reusable queries

If any of the following is true:

  • you set #cql_query{}'s reusable field to true
  • the query contains positional variable bindings (?) and you did not explicitely reusable to false
  • the query contains named variable bindings (:name) (ignores the value of reusable)

the query is considered reusable. This means that the first time this query will be performed, CQErl will ask the connected Cassandra node to prepare the query, after which, internally, a query ID will be used instead of the query statement when executing it. That particular cassandra node will hold on to the prepared query on its side and subsequent queries that use exactly the same statement will be performed faster and with less network traffic.

CQErl can tell which query has been previously prepared on which node by keeping a local cache, so all of this happens correctly and transparently.

Data types

Here is a correspondance of cassandra column types with their equivalent Erlang types (bold denotes what will used in result sets, the rest is what is accepted).

Cassandra Column Type Erlang types
ascii binary, string (only US-ASCII)
bigint integer (signed 64-bit)
blob binary
boolean true, false
counter integer (signed 64-bit)
date calendar:date()
decimal {Unscaled :: integer(), Scale :: integer()}
double float (signed 64-bit)
float float (signed 32-bit)
frozen<thing> same as <thing>
int integer (signed 32-bit)
time integer (milliseconds, signed 64-bit), now, binary or string
timestamp integer (milliseconds, signed 64-bit), now, binary or string
UDTs proplist or map depending on the maps option
uuid binary, new
varchar binary, string
varint integer (arbitrary precision)
timeuuid binary, now
inet {X1, X2, X3, X4} (IPv4), {Y1, Y2, Y3, Y4, Y5, Y6, Y7, Y8} (IPv6), string or binary
list list
map proplist or map depending on the maps option
set list
smallint integer
text binary, string
tinyint integer

Connecting to older Cassandra instances

By default, this client library assumes we're talking to a 2.2+ or 3+ instance of Cassandra. 2.1.x the latest native protocol (v4) which is required to use some of the newest datatypes and optimizations. To tell CQErl to use the older protocol version (v3), which is required to connect to a 2.1.x instance of Cassandra, you can set the protocol_version option to the integer 3, in your configuration file, i.e.

[
  {cqerl, [
            {cassandra_nodes, [ { "127.0.0.1", 9042 } ]},
            {protocol_version, 3}
          ]},
]

or in a cqerl:get_client/2 or cqerl:get_client/2 call

{ok, Client} = cqerl:get_client("127.0.0.1:9042", [{protocol_version, 3}, {keyspace, oper}]).

Installation

Just include this repository in your project's rebar.config file and run ./rebar get-deps. See rebar for more details on how to use rebar for Erlang project management.

Compatibility

As said earlier, this library uses Cassandra's newest native protocol versions (v4, or v3 optionally), which is said to perform better than the older Thrift-based interface. It also speaks CQL version 3, and uses new features available in Cassandra 2.X, such as paging, parametrization, query preparation and so on.

All this means is that this library works with Cassandra 2.1.x (2.2+ or 3+ recommended), configured to enable the native protocol. This documentation page gives details about the how to configure this protocol. In the cassandra.yaml configuration file of your Cassandra installation, the start_native_transport need to be set to true and you need to take note of the value for native_transport_port, which is the port used by this library.

Tests

CQErl includes a test suite that you can run yourself, especially if you plan to contribute to this project.

  1. Clone this repo on your machine
  2. Edit test/test.config and put your own cassandra's configurations
  3. At the project's top directory, run make test

cqerl's People

Contributors

aihturie avatar bernardd avatar bettio avatar bitchslap avatar bluesbettle avatar bvjebin avatar chernetsov avatar chrzaszcz avatar chwevans avatar dcheckoway avatar drf avatar fenek avatar janin avatar kanarde avatar kdisneur avatar lpgauth avatar matehat avatar nwt avatar olcai avatar portasynthinca3 avatar rbino avatar rslota avatar thejerf avatar uglytroll avatar waisbrot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cqerl's Issues

Binding fields with multiple conditions

My apologies if this feature already exists - I've been through the docs and the code and couldn't see how to do it.

For the query:

SELECT * FROM table WHERE time < ? AND time > ?

The time field is used twice with different values. Is there any way to specify the two different values in the cqerl_query.values list/map we pass into cqerl:run_query()? Or are we currently limited to just hardcoding those values into the query statement (presumably meaning the query can't be cached)?

Cache

How I can disable cache?

Provide Elixir code samples

It would be great if all the code samples could be translated to Elixir,
as well to make it easier for Elixir newbies to start using it (as many of us are unfamiliar with the original Erlang syntax and sometimes stumble trying to convert it).

Thank you, much appreciated

upgrade snappy-erlang for R17

In the rebar.config for this project, a "local" (stored in your GitHub account) copy of snappy-erlang is specified, which was done a few months ago as part of 6632014. This seems to have been itself really a clone of pmembrey's repository. These commits have since been upstreamed to the fdmanana's repository, and additionally the rebar.config has been updated for R17 (yes, I mean the three character change ;P). I am thereby hoping that either the local copy of snappy-erlang can be updated with the support of R17 or the URL for the configuration can be changed back to the upstream repository (which is part of why I consider this an issue to file on cqerl, the other part being that the local copy only exists because of cqerl).

C* events

How to register to the C* events e.g. status_change & topology_change?

Is it just that after authentication you just need to hit the register_frame functionality and handle the response?

gen_fsm timeouts

d0rc@1c17e77 - this what I had to do it keep it from failing again and again.
If you want, I can add corresponding option to application env, like default_timeout

uuid:get_v1 has changed to support Erlang 18

Hi,

uuid master has a backwards incompatible change:
okeuday/uuid@04a3037

in cqerl_datatypes (line 257):
change:
encode_data({timeuuid, now}, _Query) ->
uuid:get_v1(uuid:new(self(), os));

To:
encode_data({timeuuid, now}, _Query) ->
{Id, _} = uuid:get_v1(uuid:new(self(), os)),
Id;

Thanks
Ernie

Error when passing a string as uuid value

** (CaseClauseError) no case clause matching: '995d3216-cef6-4306-a10b-ec3c83aeeb0f'
    (cqerl) src/cqerl_datatypes.erl:276: :cqerl_datatypes.encode_data/2
    (stdlib) lists.erl:1238: :lists.map/2
    (cqerl) src/cqerl_processor.erl:67: :cqerl_processor.process/3

How to encode custom types (tuples)?

Hi,

I have the following query:

select * from attrs where owner_id = 'o' and (doc_id, attr_name) in (('d1', 'a'),('d2', 'a'));

I expected it to work like this:

cqerl:run_query(Client, #cql_query{
  statement = "SELECT doc_id, attr_name, attr_val FROM attrs WHERE owner_id = ? AND (doc_id, attr_name) IN ?",
  values = [
    {owner_id, <<"o">>},
    {'in(doc_id,attr_name)', [ [<<"d1">>, <<"a">>], [<<"d2">>, <<"a">>] ]}
  ]
})

But then it gives the following error:

{bad_return_value,
  {bad_param_type_custom,
    {custom,
      <<"org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)">>}}}

So the question:
How to pass those params (tuples) so they will be encoded correctly?

More specific compatibility guide? new_client failing

Thanks for the great work with cqerl.

Could you please comment a little more as to the compatibility guide? I am trying to integrate cqerl with an erlang/elixir (1.0) project and cassandra 2.0.9 or cassandra 2.1 (I have tried both cassandra versions locally)

When I attempt to grab a new client with idx, it is just dying with

      {Ok, Client} = :cqerl.new_client({})
      gen_server.call(:cqerl, {:get_client, {{127, 0, 0, 1}, 9042}, []})
        ** (EXIT) no process

I also attempted to just run the erlang tests directly with make test and seeing similar issues.

C* is up and running and am able to access this with my scala code, also using CQL and port 9042, etc.

Thanks in advance and again, thank you for the great work.

After make test i got a error like this

ERROR: erl -noshell -pa "/home/hari/newcass/cqerl-master/ebin" "/home/hari/newcass/cqerl-master/deps/pooler/ebin" "/home/hari/newcass/cqerl-master/deps/uuid/ebin" "/home/hari/newcass/cqerl-master/deps/semver/ebin" "/home/hari/newcass/cqerl-master/deps/lz4/ebin" "/home/hari/newcass/cqerl-master/deps/snappy/ebin" "/home/hari/newcass/cqerl-master/rebar" "/home/hari/newcass/cqerl-master/." -include "/home/hari/newcass/cqerl-master/include" -name test@haric -logdir "/home/hari/newcass/cqerl-master/logs" -env TEST_DIR "/home/hari/newcass/cqerl-master/test" -s ct_run script_start -s erlang halt -ct_config test/test.config -dir test >> /home/hari/newcass/cqerl-master/logs/raw.log 2>&1 failed with error: 1 and output:

Improve handling of variable bindings with collection types

Right now, values for variable bindings are provided by a property list in the values field of a #cql_query{} record. For a query of this kind:

UPDATE entries SET map_col['k1'] = ?, map_col['k2'] = ? WHERE id = 1;

Both variables will be expected to correspond to entries in the property list having the key value(map_col). The current implementation always simply look for the first entry found with the key. This does not allow this sort of query to be honored.

Doesn't seem to scale...

Hi, I tried really hard to use your client... I didn't feel like writing a new one, but it just doesn't scale to my needs. I found at least 3 bottlenecks aka single process doing work for all queries.

Here's an example of high load on your client:

(rtb-gateway@h101)5> process_info(whereis(cqerl), message_queue_len).
{message_queue_len,383675}

Obviously the gen_server times out and I can never get a client.

cqerl:new_client() returned timeout, when is heavy invoked .

I've played with cqerl and everything was fine except the fact when I ran hundred of requests concurrently. It seems that cqerl is a generic server and a new_client request is a block call (handle call). Due to this fact , if generic server is busy doing other stuff and if its queue has some other queued commands there , new_client returns timed out...
I've also played with different pool_min_size - pool_max_size of the queue but still the same issue...
Is this a good approach the fact of retrieving the worker from the pool and executing other commands are now done executed in the same gen_server ?

0.9.0 error conditions

@bernardd

I don't know if I'm missing something, but the pull request you had open does not make any test pass. The very first task of create_keyspace fails repeatedly with thousands of

*** System report during integrity_SUITE:create_keyspace/1 in initial 2016-02-13 20:14:21.021 ***

=SUPERVISOR REPORT==== 13-Feb-2016::20:14:21 ===
     Supervisor: {local,pooler_g2gDaARhf2EAYQBhAWIAACNSZAAVYW5vdGhlcl9mYWtlX2tleXNwYWNl_member_sup}
     Context:    child_terminated
     Reason:     {server_error,8704,
                               <<"Keyspace 'another_fake_keyspace' does not exist">>}
     Offender:   [{pid,<0.2751.0>},
                  {id,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]

Were the tests in the pull request, in a passing state?

TCP frame fragmentation issue

The issue was described as comments in 1388c90 and reported by @bitchslap:


Could this still an issue? (Please ignore the line numbers because I have prints in the sources.)
I'm seeing these sort of errors with really high request (a lot more than 5K/s) rates:

{error,
    {function_clause,
        [{cqerl_protocol,decode_response_term,
             [{cqerl_frame,false,undefined,false,8,75},
              <<129,0,71,8>>],
             [{file,"src/cqerl_protocol.erl"},
              {line,486}]},
         {cqerl_protocol,response_frame,2,
             [{file,"src/cqerl_protocol.erl"},
              {line,469}]},
         {cqerl_client,handle_info,3,
             [{file,"src/cqerl_client.erl"},
              {line,392}]},
         {gen_fsm,handle_msg,7,
             [{file,"gen_fsm.erl"},{line,494}]},
         {proc_lib,init_p_do_apply,3,
             [{file,"proc_lib.erl"},{line,227}]}]}}

It seems the cqerl_protocol:response_frame(....) in "live" receives
base_frame: {cqerl_frame,false,undefined,false,undefined,0}
binary <<129,0,75,8,0,0,0,4>>
Size 4
Body <<>>
for the first segment and returns {delay, Binary}.

This then after the workflow triggers activate_socket(?STATE_FROM_RESP(Resp1)) and tries to append delayed the segment. But for some reason the segment gets thrown away and it ends up crashing the process.

Delayed0 is:
<<129,0,75,8,0,0,0,4>>

BinaryMsg (you can see that the 0,0,0,1 is there to grap)
<<0,0,0,1,129,0,71,8,0,0,0,4,0,0,0,1,
129,0,61,8,0,0,0,4,0,0,0,1,129,0,80,8,
0,0,0,4,0,0,0,1,129,0,66,8,0,0,0,4,0,
0,0,1,129,0,87,8,0,0,0,4,0,0,0,1,129,
0,84,8,0,0,0,4,0,0,0,1,129,0,78,8,0,0,
0,4,0,0,0,1,129,0,89,8,0,0,0,4,0,0,0,
1,129,0,85,8,0,0,0,4,0,0,0,1,129,0,72,
8,0,0,0,4,0,0,0,1,129,0,58,8,0,0,0,4,
0,0,0,1,129,0,92,8,0,0,0,4,0,0,0,1,
129,0,70,8,0,0,0,4,0,0,0,1>>

On the next rotation the 0,0,0,1 from the beginning of the BinMsg has disappeared
Delayed0:
<<129,0,75,8,0,0,0,4>>

BinaryMsg (next round the 0,0,0,1 is missing so it must have been dropped along the way)
<<129,0,71,8,0,0,0,4,0,0,0,1,129,0,61,8,
0,0,0,4,0,0,0,1,129,0,80,8,0,0,0,4,0,
0,0,1,129,0,66,8,0,0,0,4,0,0,0,1,129,
0,87,8,0,0,0,4,0,0,0,1,129,0,84,8,0,0,
0,4,0,0,0,1,129,0,78,8,0,0,0,4,0,0,0,
1,129,0,89,8,0,0,0,4,0,0,0,1,129,0,85,
8,0,0,0,4,0,0,0,1,129,0,72,8,0,0,0,4,
0,0,0,1,129,0,58,8,0,0,0,4,0,0,0,1,
129,0,92,8,0,0,0,4,0,0,0,1,129,0,70,8,
0,0,0,4,0,0,0,1>>

and the decode_response_term crashes

=ERROR REPORT==== 13-Feb-2014::12:14:59 ===
** State machine <0.123.0> terminating
** Last message in was {tcp,#Port<0.1966>,
                            <<0,0,0,1,129,0,71,8,0,0,0,4,0,0,0,1,129,0,61,8,0,
                              0,0,4,0,0,0,1,129,0,80,8,0,0,0,4,0,0,0,1,129,0,
                              66,8,0,0,0,4,0,0,0,1,129,0,87,8,0,0,0,4,0,0,0,1,
                              129,0,84,8,0,0,0,4,0,0,0,1,129,0,78,8,0,0,0,4,0,
                              0,0,1,129,0,89,8,0,0,0,4,0,0,0,1,129,0,85,8,0,0,
                              0,4,0,0,0,1,129,0,72,8,0,0,0,4,0,0,0,1,129,0,58,
                              8,0,0,0,4,0,0,0,1,129,0,92,8,0,0,0,4,0,0,0,1,
                              129,0,70,8,0,0,0,4,0,0,0,1>>}
** When State == live
**      Data  == {client_state,cqerl_auth_plain_handler,undefined,undefined,
                     {{XXX,XXX,XXX,XXX},9042},
                     tcp,#Port<0.1966>,undefined,testdatabase,10000,
                     <<129,0,75,8,0,0,0,4>>,
                     102441,
                     [{0,undefined},
                      {1,undefined},
                      {2,undefined}.......

crashes if left working without queries for day or two

Like this:

** (exit) exited in: :gen_fsm.sync_send_event(#PID<0.5513.0>, {:send_query, #Reference<0.0.152.204801>, {:cql_query, "select * from test limit 1", [], :undefined, false, 100, :undefined, 4, :undefined}})
    ** (EXIT) time out
    (stdlib) gen_fsm.erl:208: :gen_fsm.sync_send_event/2

Restarting helps:

application:stop(cqerl).
application:start(cqerl).

CPU load is near zero. etop also reveals nothing interesting... My guess is that connections from the pool are dying. But I'm not sure..

Performance and fault-tolerance issue

When I was trying to fire ~100 query/s, the cqerl gen_server itself crashed because of the client's timeout:

CRASH REPORT Process <0.709.0> with 0 neighbours exited with reason: {timeout,{gen_fsm,sync_send_event,[<0.124.0>,{send_query,#Ref<0.0.0.4893>,{cql_query,<<"INSERT INTO...............................">>.............

After this crash, no matter how gentle I send request (like 2 reqs/s) to it, only few of them can be taken and then following with more crashes.

This issue is reproducible like 100%, not sure how did this happen, but my Cassandra server looks fine when the crash happened.

DSL

This is a proposal to add DSL-like functions to cqerl to make it more developer-friendly to manipulate queries & variable bindings.

Ideas:

cqerl:select(Client, #cql_select_statement{
    table=users,
    rows=[username, password],
    where=[{id, UserID}]
}).
-record(userdata, {
    username,
    password
}).

cqerl:select(Client, #cql_select_statement{
    table=users,
    rows=userdata,
    where=[{id, UserID}]
}).

{:ok, client} = :cqerl.new_client({"52.**.***.*",9042}) - this format doesn't work

I have tested the following two lines of code on elixir shell (iex):

{:ok, client} = :cqerl.new_client({"52.**.***.*",9042}) #Doesn't work
{:ok, client} = :cqerl.new_client({{52,**,***,*},9042}) : #Works

The environment that I am using is the following:
Elixir 1.0.5
OTP 17
Server is on AWS and I used the following configuration to make cassandra available from remote servers:

  • Comment rpc_address
  • Uncomment rpc_interface [rpc_interface: eth0]
  • Uncomment rpc_interface_prefer_ipv6: false
  • Enable thrift using nodetool [nodetool enablethrift]

After the above configuration, I was not able to connect using default configuration, so:
{:ok, client} = :cqerl.new_client({})
command started giving:
no match of right hand side value: {:error, :no_available_clients}

So, I tried connecting using the public IP address of the server, but for some reason the first command didn't work and gave the following error:
{:ok, client} = :cqerl.new_client({"52.**.***.*",9042})


** (FunctionClauseError) no function clause matching in :cqerl.prepare_node_info/1
    (cqerl) src/cqerl.erl:599: :cqerl.prepare_node_info({"52.35.128.1", 9042})
    (cqerl) src/cqerl.erl:102: :cqerl.new_client/2

Unhandled handle_call return in handle_info({retry, Msg, From, Delay}, State)

handle_call({get_client.....) function can also return {noreply, NewState} when retry operation tries to get new client if it reveiced no_available_clients during stress.

Currently it handles retry and {reply, Client, State} returns.

This {noreply, NewState} is not handled in retry. And for that shouldn't it also return the newly got client Pid?

Also in handle_call{get_client ...)

The no_available_clients return should initiate delayed reqeuest with Node value, not NodeKey as it returns the proper NodeKey through node_key(Node) function when handling requests.

erlang:send_after(?RETRY_INITIAL_DELAY, self(), {retry, {get_client, NodeKey, Opts}, From, ?RETRY_INITIAL_DELAY}
should be
erlang:send_after(?RETRY_INITIAL_DELAY, self(), {retry, {get_client, Node, Opts}, From, ?RETRY_INITIAL_DELAY})

Getting following error

Hi ,

I tried cqerl:new_client({}). on the erlang shell and I get following error.Do you see any potential reasons.

 cqerl:new_client({}).
== Starting CQErl frontend. ==
00:55:20.748 [debug] Supervisor pooler_g2gDaARhf2EAYQBhAWIAACNSZAAJdW5kZWZpbmVk_pool_sup started pooler_pooled_worker_sup:start_link({pool,g2gDaARhf2EAYQBhAWIAACNSZAAJdW5kZWZpbmVk,cqerl,5,2,{cqerl_client,start_link,[{{127,0,0,1},...},...]},...}) at pid <0.543.0>
** exception exit: {{noproc,{gen_server,call,
                                        [g2gDaARhf2EAYQBhAWIAACNSZAAJdW5kZWZpbmVk,
                                         {take_member,0},
                                         infinity]}},
                    {gen_server,call,
                                [cqerl,{get_client,{{127,0,0,1},9042},[]}]}}
     in function  gen_server:call/2 (gen_server.erl, line 204)
(ejabberd@L-9LCS022)2> 00:55:20.748 [error] Supervisor pooler_g2gDaARhf2EAYQBhAWIAACNSZAAJdW5kZWZpbmVk_pool_sup had child undefined started with pooler:start_link({pool,g2gDaARhf2EAYQBhAWIAACNSZAAJdW5kZWZpbmVk,cqerl,5,2,{cqerl_client,start_link,[{{127,0,0,1},...},...]},...}) at undefined exit with reason {'EXIT',{function_clause,[{pooler,start_link,[{pool,g2gDaARhf2EAYQBhAWIAACNSZAAJdW5kZWZpbmVk,cqerl,5,2,{cqerl_client,start_link,[{{127,0,0,1},9042},[{auth,{cqerl_auth_plain_handler,[]}},{ssl,false},{sleep_duration,{15.0,sec}},{keyspace,undefined}]]},[],0,0,1,{1,min},{15.0,sec},undefined,undefined,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},...},...}],...},...]}} in context start_error
00:55:20.748 [error] gen_server cqerl terminated with reason: no such process or port in call to gen_server:call(g2gDaARhf2EAYQBhAWIAACNSZAAJdW5kZWZpbmVk, {take_member,0}, infinity) in gen_server:call/3 line 212
00:55:20.748 [error] CRASH REPORT Process cqerl with 0 neighbours exited with reason: no such process or port in call to gen_server:call(g2gDaARhf2EAYQBhAWIAACNSZAAJdW5kZWZpbmVk, {take_member,0}, infinity) in gen_server:terminate/7 line 826
00:55:20.748 [error] Supervisor cqerl_sup had child undefined started with cqerl:start_link() at <0.147.0> exit with reason no such process or port in call to gen_server:call(g2gDaARhf2EAYQBhAWIAACNSZAAJdW5kZWZpbmVk, {take_member,0}, infinity) in context child_terminated
00:55:20.748 [debug] Supervisor cqerl_sup started cqerl:start_link() at pid <0.544.0>

Type specs are incorrect for run_query

The type spec for run_query suggests its result is #cql_result{}, however it actually returns {ok, #cql_result{}} | {error, error()} | {ok, void} where error() seems to be {pos_integer(), binary(), term()}.

Doesn't work with cassandra 3.0.2

When I try to create a new client in cassandra 3.0.2 using the following:

{:ok, client} = :cqerl.new_client({})

It gives the follwoing error:

  • (exit) exited in: :gen_server.call(:cqerl, {:get_client, {{127, 0, 0, 1}, 9042}, []})
    ** (EXIT) time out
    (stdlib) gen_server.erl:182: :gen_server.call/2

However, this works fine in cassandra 2.2.4

cqerl:new_client() returned timeout, when is heavy invoked .

Hi,
I've played with cqerl. Everything was fine expect the fact when i increased the pool connection (pool_min_size 10 and pool_max_size to 20), I often receive timeout when i ask for a new client using cqerl:new_client(). Is this a correct approach to use a block gen_server call for new_client() function ? I have thousands of clients which invoked cqerl:new_client() function, but i received timeout error for many of them :-(. For scenarios with heavy load and many usages of new_client() i don't know if gen:server:call is a good approach. Am i missing something ?
Thanks,
Eddie

How to use this project in my ejabberd server?

I need to store conversation's messages into cassandra database.So I downloaded your project and installed in my system by issuing following comments as you mentioned in installation steps of the this project. But I get error while I run it as follows.

root@livin_pc:/home/livin/Desktop/Cassandra/cqerl-master#   make .

It compile project's erl files successfully.

It through error messages while executing as follows .

root@livin_pc:/home/livin/Desktop/Cassandra/cqerl-master# make test.

=ERROR REPORT==== 15-Feb-2014::14:45:38 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}
=CRASH REPORT==== 15-Feb-2014::14:45:38 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.959.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 265
  neighbours:

=SUPERVISOR REPORT==== 15-Feb-2014::14:45:38 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.955.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 15-Feb-2014::14:45:38 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.957.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 15-Feb-2014::14:45:38 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.958.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 15-Feb-2014::14:45:38 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.959.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=ERROR REPORT==== 15-Feb-2014::14:45:38 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}"1"
{192,168,1,64}
9042
[{active,false},{mode,binary}]
"1"
{192,168,1,64}
9042
[{active,false},{mode,binary}]

=CRASH REPORT==== 15-Feb-2014::14:45:38 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.976.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 269
  neighbours:
"1"
{192,168,1,64}
9042
[{active,false},{mode,binary}]
"1"
{192,168,1,64}
9042
[{active,false},{mode,binary}]

=ERROR REPORT==== 15-Feb-2014::14:45:38 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}"1"
{192,168,1,64}
9042
[{active,false},{mode,binary}]

=CRASH REPORT==== 15-Feb-2014::14:45:38 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.981.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 269
  neighbours:

=ERROR REPORT==== 15-Feb-2014::14:45:38 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}
=CRASH REPORT==== 15-Feb-2014::14:45:38 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.982.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 269
  neighbours:

=ERROR REPORT==== 15-Feb-2014::14:45:38 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}
=CRASH REPORT==== 15-Feb-2014::14:45:38 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.984.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 269
  neighbours:

=ERROR REPORT==== 15-Feb-2014::14:45:38 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}
=CRASH REPORT==== 15-Feb-2014::14:45:38 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.985.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 269
  neighbours:

=SUPERVISOR REPORT==== 15-Feb-2014::14:45:38 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.976.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 15-Feb-2014::14:45:38 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.981.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 15-Feb-2014::14:45:38 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.982.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 15-Feb-2014::14:45:38 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.984.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 15-Feb-2014::14:45:38 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.985.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=ERROR REPORT==== 15-Feb-2014::14:45:38 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}"1"
{192,168,1,64}
9042
[{active,false},{mode,binary}]

=CRASH REPORT==== 15-Feb-2014::14:45:39 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.1001.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 253
  neighbours:

- - - - - - - - - - - - - - - - - - - - - - - - - -
load_SUITE:init_per_suite failed
Reason: {badmatch,{error,no_available_clients}}
- - - - - - - - - - - - - - - - - - - - - - - - - -


=SUPERVISOR REPORT==== 15-Feb-2014::14:45:39 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.1001.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=ERROR REPORT==== 15-Feb-2014::14:45:39 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}"1"
{192,168,1,64}
9042
[{active,false},{mode,binary}]
"1"
{192,168,1,64}
9042
[{active,false},{mode,binary}]

=ERROR REPORT==== 15-Feb-2014::14:45:39 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}"1"
{192,168,1,64}
9042
[{active,false},{mode,binary}]
"1"
{192,168,1,64}
9042
[{active,false},{mode,binary}]

=CRASH REPORT==== 15-Feb-2014::14:45:39 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.1007.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 261
  neighbours:

=ERROR REPORT==== 15-Feb-2014::14:45:39 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}
=CRASH REPORT==== 15-Feb-2014::14:45:39 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.1008.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 261
  neighbours:

=ERROR REPORT==== 15-Feb-2014::14:45:39 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}
=CRASH REPORT==== 15-Feb-2014::14:45:39 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.1009.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 261
  neighbours:

=SUPERVISOR REPORT==== 15-Feb-2014::14:45:39 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.1007.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 15-Feb-2014::14:45:39 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.1008.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 15-Feb-2014::14:45:39 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.1009.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=ERROR REPORT==== 15-Feb-2014::14:45:39 ===
pool 'g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk' failed to start member: {error,
                                                                         {connection_error,
                                                                          econnrefused}}
=CRASH REPORT==== 15-Feb-2014::14:45:39 ===
  crasher:
    initial call: cqerl_client:init/1
    pid: <0.1003.0>
    registered_name: []
    exception exit: {connection_error,econnrefused}
      in function  gen_fsm:init_it/6 (gen_fsm.erl, line 371)
    ancestors: [pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup,
                  pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_pool_sup,
                  pooler_sup,<0.176.0>]
    messages: []
    links: [<0.221.0>]
    dictionary: []
    trap_exit: false
    status: running
    heap_size: 377
    stack_size: 24
    reductions: 265
  neighbours:

=SUPERVISOR REPORT==== 15-Feb-2014::14:45:39 ===
     Supervisor: {local,
                                           pooler_g2gDaARhwGGoYQFhQGIAACNSZAAJdW5kZWZpbmVk_member_sup}
     Context:    child_terminated
     Reason:     {connection_error,econnrefused}
     Offender:   [{pid,<0.1003.0>},
                  {name,cqerl_client},
                  {mfargs,{cqerl_client,start_link,undefined}},
                  {restart_type,temporary},
                  {shutdown,brutal_kill},
                  {child_type,worker}]

Testing Cassandra.cqerl-master: *** FAILED *** init_per_suite
Testing Cassandra.cqerl-master: TEST COMPLETE, 1 ok, 3 failed, 11 skipped of 15 test cases

Updating /home/livin/Desktop/Cassandra/cqerl-master/logs/index.html... done
Updating /home/livin/Desktop/Cassandra/cqerl-master/logs/all_runs.html... done

ERROR: One or more tests failed
make: *** [test] Error 1.  

I tried many times. But I could not make it run properly and I get errors as mentioned above. Please suggest me that how can I use your project? and give me proper steps to use with my ejabberd server source files.

Client allocation

Despite pre-allocating a client pool of size equal to pool_min_size, repeated calls to cqerl:get_client would really only return the first allocated client, resorting another member of the pool only when the first one would become busy (60% of its allowed simultaneous requests in-flight).

Instead, when a pool of clients is pre-allocated with a minimal size, all of them should be used at their fullest. When all them are considered busy, then we start spawning new clients, and free them when they're unsollicited for a given time.

killing all cassandra server connections results in invalid prepared statement errors

Hi,

After restarting cassandra, I see this:
{[{reason,{badmatch,{error,{9472,<<"Prepared query with ID 83cc6bd49f5e0200e9f995f5e459f8c0 not found (either the query was not prepared on this host (maybe the host has been restarted?) or you have prepared too many queries and it has been evicted from the internal cache)">>,<<131,204,107,212,159,94,2,0,233,249,149,245,228,89,248,192>>}}}}

perhaps retry the prepare?

Compiling project fails

[user@server cqerl-master]# make compile
./rebar compile
==> Entering directory /opt/sources/cqerl2/cqerl-master/deps/snappy' ==> snappy (compile) ==> Leaving directory/opt/sources/cqerl2/cqerl-master/deps/snappy'
==> Entering directory /opt/sources/cqerl2/cqerl-master/deps/lz4' ==> lz4 (compile) ==> Leaving directory/opt/sources/cqerl2/cqerl-master/deps/lz4'
==> Entering directory /opt/sources/cqerl2/cqerl-master/deps/semver' ==> semver (compile) ==> Leaving directory/opt/sources/cqerl2/cqerl-master/deps/semver'
==> Entering directory /opt/sources/cqerl2/cqerl-master/deps/uuid' ==> Entering directory/opt/sources/cqerl2/cqerl-master/deps/quickrand'
==> quickrand (compile)
==> Leaving directory /opt/sources/cqerl2/cqerl-master/deps/quickrand' ==> uuid (compile) ==> Leaving directory/opt/sources/cqerl2/cqerl-master/deps/uuid'
==> Entering directory /opt/sources/cqerl2/cqerl-master/deps/pooler' ==> pooler (compile) ==> Leaving directory/opt/sources/cqerl2/cqerl-master/deps/pooler'
==> cqerl-master (compile)
:48: syntax error before: '='
src/cqerl.erl:144: syntax error before: '('
src/cqerl.hrl:24: syntax error before: '='
src/cqerl.erl:152: syntax error before: 'query'
src/cqerl.hrl:64: syntax error before: '::'
src/cqerl.erl:161: syntax error before: 'query'
make: *** [compile] Error 1

If trying to build only with erlc:
[user@server cqerl-master]# erlc -I include/ src/*.erl
src/cqerl_auth_plain_handler.erl:2: Warning: behaviour cqerl_auth_handler undefined
:48: syntax error before: '='
src/cqerl.hrl:24: syntax error before: '='
src/cqerl.hrl:64: syntax error before: '::'
src/cqerl_batch.erl:11: syntax error before: 'query'
src/cqerl_batch.erl:42: syntax error before: 'query'
src/cqerl_batch.erl:7: function init/3 undefined
src/cqerl_batch.erl:24: function terminate/2 undefined
src/cqerl_batch.erl:29: record cql_query undefined
src/cqerl_batch.erl:21: Warning: function loop/3 is unused

The master.zip was used to get the source files.

Excessive records in ETS table that caches prepared queries?

Hello,

Currently cqerl_cached_queries (ETS table that caches queries) has {pid(), statement()} key. This leads to having multiple records per statement, more specifically, every pool connection creates a record for the statement it calls. This seems excessive, as logically the cache table only needs a single entry per statement to decide if it has already been prepared. It's also a housekeeping problem, as the records don't get removed as the pool process dies for any reason. I don't see any usage for PID part of the key in the code, unless I'm missing something. I've made changes to use statement only as a key, and haven't encounter any problems so far.

Please advise. If removing PID from the key sounds like a good idea, I'll submit a pull request for this.

Best regards,
Boris

Newer data types of cassandra 2.2 not supported

The newer data types of Cassandra 2.2 like tinyint are not supported and an error is received while trying to select a column which is tinyint (not tested with other newer data types like date, smallint etc. ).

Can't insert negative decimals or decimals of certain size using cql_query

We're doing a project for collecting and storing sensor data using Elixir (1.1.1) and Cassandra (2.1.11). We''re using cqerl (0.8.0) to talk to Cassandra, but have noticed a problem with the decimal type. In short, if using the cql_query record to define a query, it fails when inserting negative decimals, and inserts a wrong number if the decimal has a certain size (e.g. trying to insert {10357780, 0} will store -6419436 in Cassandra).

Assume a simple table like this:

CREATE TABLE tests.decimals (id INT PRIMARY KEY, value DECIMAL);

If I use a simple query string to insert data like this, it works:

{:ok, client} = :cqerl.new_client
{:ok, result} = :cqerl.run_query(client, "INSERT INTO tests.decimals (id, value) VALUES (10, -10.0);")
{:ok, result} = :cqerl.run_query(client, "INSERT INTO tests.decimals (id, value) VALUES (10, 10357780);")

But if I try to use cql_query, inserting a negative number will fail:

query = cql_query(statement: "INSERT INTO tests.decimals (id, value) VALUES (11, ?);", values: [value: {-10, 0}])
{:ok, client} = :cqerl.new_client
{:ok, result} = :cqerl.run_query(c, query)

That throws the following error:

[error] Process #PID<0.309.0> raised an exception
** (ArithmeticError) bad argument in arithmetic expression
    (stdlib) :math.log(-10)
    (cqerl) src/cqerl_datatypes.erl:342: :cqerl_datatypes.encode_data/2
    (cqerl) src/cqerl_datatypes.erl:314: :cqerl_datatypes.encode_data/2
    (stdlib) lists.erl:1238: :lists.map/2
    (cqerl) src/cqerl_processor.erl:67: :cqerl_processor.process/3

Fixing this is pretty trivial, we just need to call abs/1 on Val before calling math.log: trarbr@a171272

However, for numbers of a certain size (positive or negative) it will some times store the wrong number. (maybe the byte count is wrong?). E.g:

query = cql_query(statement: "INSERT INTO tests.decimals (id, value) VALUES (11, ?);", values: [value: {10357780, 0}])
{:ok, client} = :cqerl.new_client
{:ok, result} = :cqerl.run_query(c, query)

This will store -6419436 in Cassandra instead of 10357780.

Fixing the issue with negative numbers was pretty easy, but I don't know how to fix the other issue.

Support Cassandra 2.1.x

Version 2.1.x of Cassandra uses native protocol v3, and to support the latest features we went straight to v4, which is only used by Cassandra v2.2.x and v3.x.

Problem with prepared statements and Cassandra functions

I'm trying to construct a query that involves TimeUUID fields, and would want to use the minTimeuuid() and maxTimeuuid() Cassandra functions, in a prepared statement like the following:

SELECT Time, Stuff FROM foo WHERE Time > minTimeuuid(?) AND Time < maxTimeuuid(?)

But I couldn't figure out what key to use in values to make that work. cqerl says I'd need arg0(mintimeuuid), but I failed at figuring out how to set that.

(Please note that my Erlang knowledge is very basic, so I may be trying something very silly.)

broken code in master "{timeout, {gen_server,call, [cqerl, {get_client,..."

how to reproduce

  1. clone repo
  2. run make
  3. try to use it in erlang shell
$ erl -pa ebin deps/*/ebin 
Erlang/OTP 18 [erts-7.1] [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V7.1  (abort with ^G)
1> application:start(crypto).
ok
2> application:start(asn1).
ok
3> application:start(public_key).
ok
4> application:start(ssl).
ok
5> application:start(pooler).
ok
6> application:start(cqerl).
{error,{not_started,re2}}
7> application:start(re2).
ok
8> application:start(cqerl).
== Starting CQErl frontend. ==
ok
9> 
9> 
9> cqerl:new_client({ "10.10.40.12", 9042}, [{auth, {cqerl_auth_plain_handler, [{"developer", "paaasss"} ]}}]).
** exception exit: {timeout,
                       {gen_server,call,
                           [cqerl,
                            {get_client,
                                {{10,1,40,112},9042},
                                [{auth,
                                     {cqerl_auth_plain_handler,[{"developer","paaasss"}]}}]}]}}
     in function  gen_server:call/2 (gen_server.erl, line 204)
10> 

OS: debian 8.2
erlang version: 18.1 (from erlang-solution repo)

Structure and usage of result sets

In response to a SELECT query, a #cql_result{} record is handed over, from which cqerl:head/1 or cqerl:next/1 can be used to retrieve a single row as a property list. From there, users need to rely on the proplists library to find column values.

I propose making the row structure an opaque more compact data structure, that could be used to both retrieve single column values, but also fill up user-defined record in a more efficient manner. Something along the line of:

Result = cqerl:run_query(Client, "SELECT * FROM users;"),
Row = cqerl:head(Result),
UserName = cqerl:get_value(username, Row),
UserData#userdata{username=UserName} = cqerl:head(Result, userdata),
{UserData, Result2} = cqerl:next(Result1, userdata).

killing all cassandra server connections results in a hung cqerl pool

Just testing on my local cassandra server, if I perform a service cassandra stop,
I see the following messages:
Received message {live,{tcp_closed,#Port<0.13856>},tcp,#Port<0.13856>}
Received message {live,{tcp_closed,#Port<0.13852>},tcp,#Port<0.13852>}
Received message {live,{tcp_closed,#Port<0.13854>},tcp,#Port<0.13854>}
Received message {live,{tcp_closed,#Port<0.13800>},tcp,#Port<0.13800>}
Received message {live,{tcp_closed,#Port<0.13853>},tcp,#Port<0.13853>}

after that, all requests fail with a timeout.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.