Code Monkey home page Code Monkey logo

ksql-python's Introduction

ksql-python

A python wrapper for the KSQL REST API. Easily interact with the KSQL REST API using this library.

Supported KSQLDB version: 0.10.1+ Supported Python version: 3.5+

image

image

image

image

image

Installation

pip install ksql

Or

git clone https://github.com/bryanyang0528/ksql-python
cd ksql-python
python setup.py install

Getting Started

Setup for KSQL

This is the GITHUB page of KSQL. https://github.com/confluentinc/ksql

If you have installed open source Confluent CLI (e.g. by installing Confluent Open Source or Enterprise Platform), you can start KSQL and its dependencies with one single command:

confluent start ksql-server

Setup for ksql-python API

  • Setup for the KSQL API:
from ksql import KSQLAPI
client = KSQLAPI('http://ksql-server:8088')
  • Setup for KSQl API with logging enabled:
import logging
from ksql import KSQLAPI
logging.basicConfig(level=logging.DEBUG)
client = KSQLAPI('http://ksql-server:8088')
  • Setup for KSQL API with Basic Authentication
from ksql import KSQLAPI
client = KSQLAPI('http://ksql-server:8088', api_key="your_key", secret="your_secret")

Options

Option Type Required Description
url string yes Your ksql-server url. Example: http://ksql-server:8080
timeout integer no Timout for Requests. Default: 5
api_key string no API Key to use on the requests
secret string no Secret to use on the requests

Main Methods

ksql

This method can be used for some KSQL features which are not supported via other specific methods like query, create_stream or create_stream_as. The following example shows how to execute the show tables statement:

client.ksql('show tables')
  • Example Response [{'tables': {'statementText': 'show tables;', 'tables': []}}]

query

It will execute sql query and keep listening streaming data.

client.query('select * from table1')

This command returns a generator. It can be printed e.g. by reading its values via next(query) or a for loop. Here is a complete example:

from ksql import KSQLAPI
client = KSQLAPI('http://localhost:8088')
query = client.query('select * from table1')
for item in query: print(item)
  • Example Response

    {"row":{"columns":[1512787743388,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753200,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}

Query with HTTP/2

Execute queries with the new /query-stream endpoint. Documented here

To execute a sql query use the same syntax as the regular query, with the additional use_http2=True parameter.

client.query('select * from table1', use_http2=True)

A generator is returned with the following example response

{"queryId":"44d8413c-0018-423d-b58f-3f2064b9a312","columnNames":["ORDER_ID","TOTAL_AMOUNT","CUSTOMER_NAME"],"columnTypes":["INTEGER","DOUBLE","STRING"]}
[3,43.0,"Palo Alto"]
[3,43.0,"Palo Alto"]
[3,43.0,"Palo Alto"]

To terminate the query above use the close_query call. Provide the queryId returned from the query call.

client.close_query("44d8413c-0018-423d-b58f-3f2064b9a312")

Insert rows into a Stream with HTTP/2

Uses the new /inserts-stream endpoint. See documentation

rows = [
        {"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"},
        {"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"}
    ]

results = self.api_client.inserts_stream("my_stream_name", rows)

An array of object will be returned on success, with the status of each row inserted.

Simplified API

create_stream/ create_table

client.create_stream(table_name=table_name,
                     columns_type=columns_type,
                     topic=topic,
                     value_format=value_format)

Options

Option Type Required Description
table_name string yes name of stream/table
columns_type list yes ex:['viewtime bigint','userid varchar','pageid varchar']
topic string yes Kafka topic
value_format string no JSON (Default) or DELIMITED or AVRO
key string for Table Key (used for JOINs)
  • Responses
If create table/stream succeed

return True

If failed

raise a CreateError(respose_from_ksql_server)

create_stream_as

a simplified api for creating stream as select

client.create_stream_as(table_name=table_name,
                        select_columns=select_columns,
                        src_table=src_table,
                        kafka_topic=kafka_topic,
                        value_format=value_format,
                        conditions=conditions,
                        partition_by=partition_by,
                        **kwargs)
CREATE STREAM <table_name>
[WITH ( kafka_topic=<kafka_topic>, value_format=<value_format>, property_name=expression ... )]
AS SELECT  <select_columns>
FROM <src_table>
[WHERE <conditions>]
PARTITION BY <partition_by>];

Options

Option Type Required Description
table_name string yes name of stream/table
select_columns list yes you can select [*] or ['columnA', 'columnB']
src_table string yes name of source table
kafka_topic string no The name of the Kafka topic of this new stream(table).
value_format string no DELIMITED, JSON(Default) or AVRO
conditions string no The conditions in the where clause.
partition_by string no Data will be distributed across partitions by this column.
kwargs pair no please provide key=value pairs. Please see more options.

KSQL JOINs

KSQL JOINs between Streams and Tables are not supported yet via explicit methods, but you can use the ksql method for this like the following:

client.ksql("CREATE STREAM join_per_user WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='join_per_user') AS SELECT Time, Amount FROM source c INNER JOIN users u on c.user = u.userid WHERE u.USERID = 1")

FileUpload

upload

Run commands from a .ksql file. Can only support ksql commands and not streaming queries.

from ksql.upload import FileUpload
pointer = FileUpload('http://ksql-server:8080')
pointer.upload('rules.ksql')

Options

Option Type Required Description
ksqlfile string yes name of file containing the rules
  • Responses
If ksql-commands succesfully executed

return (List of server response for all commands)

If failed

raise the appropriate error

More Options

There are more properties (partitions, replicas, etc...) in the official document.

KSQL Syntax Reference

  • Responses
If create table/stream succeed

return True

If failed

raise a CreatError(respose_from_ksql_server)

ksql-python's People

Contributors

bbkchdhry avatar ben4932042 avatar bradleywboggs avatar bryanyang0528 avatar dependabot[bot] avatar dimino avatar graham2071 avatar harlev avatar hharrowood avatar joaomedeiros95 avatar jurieotto avatar kaiwaehner avatar kencox94 avatar lucrussell avatar maulikjs avatar naja1s avatar romainr avatar sebastianneubauer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ksql-python's Issues

Expected 1 brokers but found only 0. Trying to query Kafka for metadata again

Added additional
kafka-connect but now failing with

[main] ERROR io.confluent.admin.utils.ClusterStatus - Error while getting broker list.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1599952674475) timed out at 1599952674476 after 1 attempt(s)
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:149)
at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:150)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1599952674475) timed out at 1599952674476 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
[kafka-admin-client-thread | adminclient-1] WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
[main] INFO io.confluent.admin.utils.ClusterStatus - Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ...
[main] ERROR io.confluent.admin.utils.ClusterStatus - Expected 1 brokers but found only 0. Brokers found [].
source /Users/mincheung/Documents/GitHub/ksql-python/.venv/bin/activate

docker-compose.yml

version: "3"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.10.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - schema-registry
      - kafka-connect
    ports:
      - "8088:8088"
    volumes:
      - "./confluent-hub-components/:/usr/share/kafka/plugins/"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.10.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-python:
    build:
      context: .
    network_mode: host
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
    environment:
      KSQL_SERVER: localhost:8088
      STREAMS_BOOTSTRAP_SERVERS: localhost:29092
    volumes:
      - ./:/app
    command: tail -f /dev/null

  kafka-connect:
    image: confluentinc/cp-kafka-connect-base:5.5.0
    container_name: kafka-connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
    command: 
      - bash 
      - -c 
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.5.0
        confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.43
        #
        # -----------
        # Launch the Kafka Connect worker
        /etc/confluent/docker/run &
        #
        # Don't exit
        sleep infinity
    volumes:
      - $PWD/data:/data    


  postgres:
    # *-----------------------------*
    # To connect to the DB: 
    #   docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
    # *-----------------------------*
    image: postgres:12
    container_name: postgres
    environment:
     - POSTGRES_USER=postgres
     - POSTGRES_PASSWORD=postgres

  kafkacat:
    image: edenhill/kafkacat:1.5.0
    container_name: kafkacat
    links:
      - broker
      - schema-registry
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

  kafka-rest:
    image: confluentinc/cp-kafka-rest:5.5.0
    hostname: kafka-rest
    ports:
    - "8082:8082"
    depends_on:
      - schema-registry
    environment:
      KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_REST_SCHEMA_REGISTRY_URL: schema-registry:8081
      KAFKA_REST_HOST_NAME: kafka-rest
      KAFKA_REST_LISTENERS: http://kafka-rest:8082

KSQL 5.0 Query not working

First of all, great to see the 5.0 migration merged into the project.

I tried the updated project with my KSQL demo. I started Kafka and KSQL via Confluent CLI and your Python app can connect it successfully. "Show Streams" works well. But the simple "select * from X" query does not work yet. It returns <generator object query at 0x10a4ff370>.

`kai.waehner@Kais-MacBook-12:~|⇒ python
Python 2.7.14 |Anaconda custom (64-bit)| (default, Oct 5 2017, 02:28:52)
[GCC 4.2.1 Compatible Clang 4.0.1 (tags/RELEASE_401/final)] on darwin
Type "help", "copyright", "credits" or "license" for more information.

import logging
from ksql import KSQLAPI
logging.basicConfig(level=logging.DEBUG)
client = KSQLAPI('http://localhost:8088')
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /info HTTP/1.1" 200 None
client.ksql('show tables')
DEBUG:root:KSQL generated: show tables
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "POST /ksql HTTP/1.1" 200 None
[{u'tables': [], u'@type': u'tables', u'statementText': u'show tables;'}]
client.ksql('show streams')
DEBUG:root:KSQL generated: show streams
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "POST /ksql HTTP/1.1" 200 None
[{u'streams': [{u'topic': u'pageviews', u'type': u'STREAM', u'name': u'PAGEVIEWS_ORIGINAL', u'format': u'DELIMITED'}], u'@type': u'streams', u'statementText': u'show streams;'}]
client.query('select * from pageviews_original')
<generator object query at 0x10a4ff370>`

Any ideas?

It is a normal KSQL Stream, nothing special:

`ksql> describe pageviews_original;

Name : PAGEVIEWS_ORIGINAL
Field | Type

ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
VIEWTIME | BIGINT
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)

For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>`

TLS and Certificates

Hi - I'm wondering if there's anything in the works for TLS and client certificate support for the KSQLAPI python module?

We've currently got a proof of concept python script that we're talking directly to KSQL via the request module due to requiring https/certificate support, but it would be nice to use your KSQLAPI module instead. I'm happy to have a go at figuring out what changes would need to be made to the KSQLAPI and submit it back (though I'm not a python developer so will need to be checked!).

ksql.upload.FileUpload.upload() gives InvalidQueryError

The following snippet from ksql/upload.py:38 has a problem which is causing the InvalidQueryError exception to be raised when parsing a file that has one statement per line and each statement ends with a semicolon. The issue seems to be caused by rule being an empty string after yielding, and then the last 2 lines check that the empty rule ends with a semicolon... but the last rule already yielded!

Believe the fix is to simply delete these lines.

    def get_rules_list(self, ksqlfile):
        with open(ksqlfile) as rf:
            rule = ""

            for line in rf:
                rule = rule + " " + line.strip()

                if rule[-1:] == ";":
                    yield rule
                    rule = ""

            if rule[-1:] != ";":
                raise InvalidQueryError(rule)

query timeout if i dont put limit in query.

How can we get continues data from stream
client = KSQLAPI('http://127.0.0.1:8088')
streamProperties = {"ksql.streams.auto.offset.reset": "earliest"}
def cal():
def events():
query_string="select * from trend_news_stream emit changes;"
q = client.query(query_string, stream_properties=streamProperties, idle_timeout=10)
for c in q:
yield c
return Response(events(), mimetype="text/event-stream")

Does anyone manage this Repsitory?

Hi, Everyone.

Does anyone manage this Repsitory?
Pull requests have been piled up for a long time, and there is no response to the issue.

I need to fix a bug, can I make a request?

Improve code coverage to 100%

It would be nice to get the code coverage to 100%. Then the report would immediately jump out at you whenever a line of code is missed during any newly added feature.

I would not mind getting stuck in the testing, but most of the unit testing python packages and tools are new to me. @bryanyang0528 would it perhaps be possible for you to add some unit testing instructions to help me getting started with this endeavor?

SELECT Query splits single result into three

I struggle realizing a relative simple example. My SELECT Query is LIMIT 1, i.e. just returns one result (I also send just one single test message to the Kafka topic. But it splits it up into three different lines in the return object.

query = client.query('SELECT * FROM CREDITCARDFRAUD_PREPROCESSED_AVRO LIMIT 1')

#print(len(list(query))) #=> shows 3

print(list(query))

The latter print returns:

['\n\n{"row":{"columns":[1547195011919,null,0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.46', '2387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705', ',-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"]},"errorMessage":null,"finalMessage":null}\n{"row":null,"errorMessage":null,"finalMessage":"Limit Reached"}\n']

Note the empty space (which is not just a space but it splits the three list items) between ,0.46', '2387777762292, and between 0.251412098239705', ',-0.018306777944153 (you can see it in the attached screenshot of my Jupyter notebook).

image

As you can also see we tried to parse this somehow to get back one result instead of three (as quick hot fix) but we did not get it working yet.

KSQL Query function is timing out but working fine in cli and curl

When I make a ksql query using the ksql-cli and curl, It works fine. Making the same query using this library doesn't work. After investigating the issue, it looks like the problem is how python requests library handles the results https://stackoverflow.com/a/28156068/4395533.
This quote from the stack overflow user larsks who answered the question summarises the problem

This behaviour is due to a buggy implementation of the iter_lines method in the requests library.

iter_lines iterates over the response content in chunk_size blocks of data using the iter_content > iterator. If there are less than chunk_size bytes of data available for reading from the remote > > server (which will typically be the case when reading the last line of output), the read operation > will block until chunk_size bytes of data are available

I made the same request in golang using the net/http package the behaviour was the same so one could argue that the implementation of the clients making the requests is correct and its ksql that is returning results in a sub-optimal way.

Of course, the query works fine in curl and the workaround we are using is to make a call to curl through pythons subprocess library and then pipe the data back to our application. I guess that is a case for the issue being with how this library is handling responses

IndexError after inserting

i'm receiving the following error after inserting into a stream using

client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);")`

Traceback (most recent call last):
File "", line 1, in
File "/home/ubuntu/.local/lib/python3.6/site-packages/ksql/client.py", line 42, in ksql
return self.sa.ksql(ksql_string, stream_properties=stream_properties)
File "/home/ubuntu/.local/lib/python3.6/site-packages/ksql/api.py", line 64, in ksql
self._raise_for_status(r, response)
File "/home/ubuntu/.local/lib/python3.6/site-packages/ksql/api.py", line 54, in _raise_for_status
if r_json[0]["@type"] == "currentStatus" and r_json[0]["commandStatus"]["status"] == "ERROR":
IndexError: list index out of range

Although the insert is successful and I can view it on a select query, any ideas about what's causing it to throw an error?

Add setting of stream properties support

For example it is not possible to run the KSQL query:
SELECT * FROM pageviews;
with the properties:
SET 'auto.offset.reset' = 'earliest';

In this example, allowing the configuration of the 'auto.offset.reset' property, allows for queries from the start of the data and not just from the current data position.

The KSQL JSON format does support the setting of the stream properties and can be set as follows:

POST /query HTTP/1.1
Accept: application/vnd.ksql.v1+json
Content-Type: application/vnd.ksql.v1+json

{
  "ksql": "SELECT * FROM pageviews;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}

I propose the following interface for this feature:
client.query("SELECT * FROM pageviews;", streams_properties={'auto.offset.reset': 'earliest'})

I am busy working on this feature, because it is quite useful for me.

release new version based on current master branch?

There's a bugfix on the master branch that's not currently available when you download this from pypi: namely a try...except StopIteration block was added in the process_query_results function in ksql.utils module.

Can you get a new minor version released to PyPI of the master branch?

Querying KSQL table with ksql-python returning only header but no records

Hello please help me with the following issue:

As background, I want to use Debezium to generate messages from Oracle Table from which to generate Events and I created KSQL table from the topic using AVRO format and Schema Registry. But the problem with the Table is that it returns me only the column types as the header but not the record itself. And when I try to use the same query with the ksql-cli it returns me that record.

I'm executing this:

client = KSQLAPI(url)
res = client.query("SELECT * FROM TRANSACTION_TYPES WHERE TRT_SERIAL = 4 EMIT CHANGES")
for x in res:
    print(x)

And what I got as a result is:

{"header":{"queryId":"none","schema":"`ROWTIME` BIGINT, `ROWKEY` STRING, `TRT_ARREARS_EFFECTIVE` STRING, `TRT_CODE` STRING, `TRT_DESCRIPTION` STRING, `TRT_SERIAL` BIGINT, `TRT_TRANSACTION_OR_ACCRUAL` STRING, `TRT_CASH_TRANSACTION` STRING, `TRT_CREDIT_ALLOWED` STRING, `TRT_DEBIT_ALLOWED` STRING, `TRT_DEBTOR_TRANSACTIONS` STRING, `TRT_INPUT_OR_AUTOMATIC` STRING, `TRT_PRODUCE_CHEQUE` STRING, `TRT_QUERY_ALLOWED` STRING, `TRT_SUSPENSE_TRANSACTION` STRING, `TRT_FEE_TRANSACTION` STRING, `TRT_ALIAS` BIGINT, `TRT_THIRD_PARTY_TRANSACTION` STRING, `TRT_INVOICE_TYPE` BIGINT, `TRT_SETTLEMENT_TRANSACTION` STRING, `TRT_INTEREST_ACCRUAL_EFFECTIVE` STRING, `TRT_EXPENSE_TRANSACTION` STRING, `TRT_RECOVERY_TRANSACTION` STRING, `TRT_CAPITAL_REPAYMENT` STRING, `TRT_SPLITTER_TYPE_VLC` BIGINT, `TRT_INCLUDE_ON_STATEMENT` STRING, `TRT_IGNORE_PAYOUT_DELAY` STRING, `SERIAL` BIGINT, `__OP` STRING, `__TABLE` STRING, `__SOURCE_TS_MS` BIGINT, `__DELETED` STRING"}}

ksql has undeclared build-time dependencies

A Poetry user ran into undeclared build-time dependencies in ksql -- namely, pip is imported in setup.py but there is no declaration of this dependency:

ksql-python/setup.py

Lines 10 to 13 in 6161b48

if LooseVersion(pip.__version__) >= "10.0.0":
from pip._internal.req import parse_requirements
else:
from pip.req import parse_requirements

If you want to reliably import pip during a build, you need to declare the dependency somewhere. The modern way to do this (as described at that link) is to list everything in build-system.requires of your pyproject.toml; however, you can also use the deprecated setup_requires argument to setup() if you prefer.

CreateError when attempting to create a table

I'm getting an error when trying to create a table. This is the code:

from ksql import KSQLAPI
client = KSQLAPI('http://localhost:8088')
client.create_table(table_name='mytable',
                    columns_type=['viewtime bigint', 'userid varchar', 'pageid varchar'],
                    topic='mytopic',
                    value_format='JSON')

and the error:

Traceback (most recent call last):
  File "/home/lrussell/Dev/ksql-plotly-dash/basic.py", line 6, in <module>
    value_format='JSON')
  File "/home/lrussell/.virtualenvs/ksql-plotly-dash/lib/python3.6/site-packages/ksql/client.py", line 50, in create_table
    value_format = value_format)
  File "/home/lrussell/.virtualenvs/ksql-plotly-dash/lib/python3.6/site-packages/ksql/api.py", line 142, in create_table
    value_format=value_format)
  File "/home/lrussell/.virtualenvs/ksql-plotly-dash/lib/python3.6/site-packages/ksql/api.py", line 164, in _create
    return self._parse_ksql_res(r, CreateError)
  File "/home/lrussell/.virtualenvs/ksql-plotly-dash/lib/python3.6/site-packages/ksql/api.py", line 40, in _parse_ksql_res
    raise CreateError(r)
ksql.errors.CreateError: Cannot define a TABLE without providing the KEY column name in the WITH clause.

I think the error might be related to this line. Maybe it needs an extra value in the WITH clause for the key? The key isn't listed as a required property for creating tables according to the KSQL documentation, but possibly something has changed recently.

pip package broken **Still get an error:**

Still get an error:
/opt/anaconda3/bin/python -m pip install --trusted-host pypi.python.org --trusted-host files.pythonhosted.org --trusted-host pypi.org --upgrade ksql
Collecting ksql
Downloading https://files.pythonhosted.org/packages/98/9f/1187d5bf9ab6e68c23fde44f93df2e1aae94083e1c6608a299aa1c6a99f5/ksql-0.5.1.tar.gz
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "", line 1, in
File "/tmp/pip-install-cwddsvt7/ksql/setup.py", line 52, in
install_requires=get_install_requirements("requirements.txt"),
File "/tmp/pip-install-cwddsvt7/ksql/setup.py", line 27, in get_install_requirements
content = open(os.path.join(os.path.dirname(file), path)).read()
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/pip-install-cwddsvt7/ksql/requirements.txt'

----------------------------------------

Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-install-cwddsvt7/ksql/

Originally posted by @crocus in #51 (comment)

Create a KSQLdb table from a stream

I try to create a table with ksql python client. My query is:

CREATE TABLE AVG5MINTEMP WITH (KAFKA_TOPIC='AVG5MINTEMP', KEY_FORMAT='JSON', PARTITIONS=1, REPLICAS=1) AS SELECT
METEO_RAW.TYPE TYPE,
METEO_RAW.NOME_UNITA NOME_UNITA,
AVG(METEO_RAW.VALORE) AVG_VALUE
FROM METEO_RAW METEO_RAW
WINDOW TUMBLING ( SIZE 5 MINUTES )
GROUP BY METEO_RAW.TYPE, METEO_RAW.NOME_UNITA
where METEO_RAW.NOME_UNITA = 'Unita 1' AND
METEO_RAW.ID_SENSORE = 2
EMIT CHANGES;

where METEO_RAW is a stream from a Kafka topic. In the UI KSQLdb interface it works fine.
So I try with the sample command to submit a query:

client.create_stream_as(table_name="AVG5MINTEMP",
select_columns=["METEO_RAW.TYPE TYPE", "METEO_RAW.NOME_UNITA NOME_UNITA", "AVG(NOME_UNITA.VALORE) AVG_VALUE"],
src_table="METEO_RAW",
kafka_topic="AVG5MINTEMP",
value_format = 'JSON',
conditions="WINDOW TUMBLING ( SIZE 5 MINUTES ) GROUP BY TYPE
WHERE NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES; ")
but I receive the error:

DEBUG:root:content: {'@type': 'statement_error', 'error_code': 40001, 'message': "line 1:199: mismatched input 'WINDOW' expecting {'(', 'EMIT', 'CHANGES', 'FINAL', 'NOT', 'ESCAPE', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'TYPES', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'KEY', 'SINK', 'SOURCE', 'PRIMARY', 'REPLACE', 'ASSERT', 'ADD', 'ALTER', 'IF', '+', '-', STRING, INTEGER_VALUE, DECIMAL_VALUE, FLOATING_POINT_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER, VARIABLE}", 'statementText': "CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(NOME_UNITA.VALORE) AVG_VALUE FROM METEO_RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) GROUP BY TYPE WHERE NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;", 'entities': []}

I try to use create_stream_as in this manner:

client.create_stream_as(table_name="AVG5MINTEMP",
select_columns=["METEO_RAW.TYPE TYPE", "METEO_RAW.NOME_UNITA NOME_UNITA", "AVG(METEO_RAW.VALORE) AVG_VALUE"],
src_table="METEO_RAW METEO RAW",
kafka_topic="AVG5MINTEMP",
value_format = 'JSON',
conditions="WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES; ")

but I receive the error:

DEBUG:root:KSQL generated: CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(METEO_RAW.VALORE) AVG_VALUE FROM METEO_RAW METEO RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;
DEBUG:root:content: {'@type': 'statement_error', 'error_code': 40001, 'message': "line 1:198: mismatched input 'RAW' expecting {';', 'EMIT', 'WHERE', 'WINDOW', 'GROUP', 'HAVING', 'LIMIT', 'PARTITION'}", 'statementText': "CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(METEO_RAW.VALORE) AVG_VALUE FROM METEO_RAW METEO RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;", 'entities': []}

Please can support in this case how can create a table from a stream?
Ths.

client.ksql('show tables') returns error 'not all arguments converted during string formatting'

from ksql import KSQLAPI api_key = 'ZD74E3GRK4QXWO6W' api_secret = 'RByQinKf4ZYodiBLuCKybx92SSPrQwEwnA8DOaVfJEhAVf3LQ096yFteZkep4XKx' ksql_endpoint = 'https://pksqlc-42o7q.us-east-1.aws.confluent.cloud:443' client = KSQLAPI(ksql_endpoint, api_key=api_key, secret=api_secret) client.ksql('show tables')

This code returns:

not all arguments converted during string formatting

The offending code is in line 108 of api.py

base64string = base64.b64encode('{}:{}' % (self.api_key, self.secret))

Other calls to client return the same error, such as
client.query('select userid from users')

pip package broken

MacOSX 10.14.2
Python3 installed using brew

running pip install ksql gives this error:

Collecting ksql
Downloading https://files.pythonhosted.org/packages/98/9f/1187d5bf9ab6e68c23fde44f93df2e1aae94083e1c6608a299aa1c6a99f5/ksql-0.5.1.tar.gz
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "", line 1, in
File "/private/var/folders/jp/s96xr96s7k3g94fsvpbdh9cr0000gn/T/pip-install-tkknoxhn/ksql/setup.py", line 52, in
install_requires=get_install_requirements("requirements.txt"),
File "/private/var/folders/jp/s96xr96s7k3g94fsvpbdh9cr0000gn/T/pip-install-tkknoxhn/ksql/setup.py", line 27, in get_install_requirements
content = open(os.path.join(os.path.dirname(file), path)).read()
FileNotFoundError: [Errno 2] No such file or directory: '/private/var/folders/jp/s96xr96s7k3g94fsvpbdh9cr0000gn/T/pip-install-tkknoxhn/ksql/requirements.txt'

I think the setup.py file is missing

KSQL RESTful API Basic Authentication support missing

Hi,

Our KSQL server is enabled with Basic http authentication(username:password). I don't find the relevant options while creating the client. i.e. ksql.KSQLAPI(serverIP:port)

Curl with param -u : works fine for me.

*return* value, don't *print* it?

Is there a reason that BaseAPI.query prints the returned data to the console instead of returning it? The .ksql method returns the value (as I would expect), why is the behavior different?

For the time being, I'm inheriting and overwriting this method:

     def query(self, query_string, encoding='utf-8', chunk_size=128):
         """
         Process streaming incoming data.
 
         """
         r = self._request(endpoint='query', sql_string=query_string)
 
-        for chunk in r.iter_content(chunk_size=chunk_size):
-            if chunk != b'\n':
-                print(chunk.decode(encoding))
+        r = [ a.decode(encoding) for a in r.iter_lines(chunk_size=chunk_size) if not a in [b'\n', b''] ]
+        return(r)

Side note: I use .iter_lines instead of .iter_content, because the latter is splitting lines. For example, if I return the first r immediately after self._request(...), then I can get something like this:

In [884]: r = myclient.query("select a,b from t_loop limit 3")
In [885]: list(r.iter_content(chunk_size=128))
Out[885]: 
[b'\n',
 b'\n',
 b'\n',
 b'\n',
 b'\n',
 b'\n',
 b'{"row":{"columns":["90","1"]},"errorMessage":null}',
 b'\n',
 b'\n',
 b'{"row":{"columns":["94","2"]},"errorMessage":null}',
 b'\n',
 b'{"row":{"columns":["47","2"]},"errorMessage":null}',
 b'\n',
 b'\n',
 b'\n',
 b'\n{"row":null,"errorMessage":{"@type":"generic_error","error_code":50000,"message":"LIMIT reached for the partition.","stackTrace',
b'":["io.confluent.ksql.structured.QueuedSchemaKStream$QueuePopulator.apply(QueuedSchemaKStream.java:179)","io.confluent.ksql.stru',

where the long error message continues as individual lines per element in the returned list. If I want to use any of these, I can json.loads each of the elements until I get to the error, at which point the b'\n{"row":null,"errorMessage":..."stractTrace' will not parse correctly. Why? Because it is incomplete JSON, requiring the concatenation of the subsequent lines (32 lines for this error). If wanted to try to json.loads them, I'd have to try a single element; if that fails, concat it with the next; if that fails, keep trying until I find a complete subsequence of elements that contains the entire JSON. Possible but inefficient.

Instead:

In [887]: r = myclient.query("select a,b from t_loop limit 3")
In [888]: list(r.iter_lines())
Out[888]: 
[b'',
 b'',
 b'',
 b'',
 b'',
 b'',
 b'{"row":{"columns":["71","2"]},"errorMessage":null}',
 b'{"row":{"columns":["4","3"]},"errorMessage":null}',
 b'',
 b'{"row":{"columns":["91","3"]},"errorMessage":null}',
 b'',
 b'',
 b'',
 b'{"row":null,"errorMessage":{"@type":"generic_error","error_code":50000,"message":"LIMIT reached for the partition.","stackTrace":["io.confluent.ksql.structured.QueuedSchemaKStream$QueuePopulator.apply(QueuedSchemaKStream.java:179)","io.confluent.ksql.structured.QueuedSchemaKStream$Que...

and the error message (truncated here) is contained in one element in the list. This way, I know that a long return-value is always in one element.

Abandoned Repo

I've contacted the owner of this repo via LinkedIn since he has yet to make any updates ( pull requests, commits, etc ) over the past year.

I will give it a week and then, depending on how the community feels I will attempt to take ownership following these steps

Please let me know how you feel about this. I regularly use this library and it would be a shame to let it idle and eventually die.

Tagging contributors

@romainr
@kaiwaehner
@lucrussell
@jurieotto
@harlev
@dimino
@sebastianneubauer

Bug: PEP 479 StopIteration leads to RuntimeError

In Python 3.7 and above, StopIteration exceptions raised directly or indirectly in coroutines and generators are transformed into RuntimeError exceptions. This means that the next()

header = next(results)
will break things if it reaches the end of the results generator. See the discussion here: https://stackoverflow.com/questions/51700960/runtimeerror-generator-raised-stopiteration-every-time-i-try-to-run-app.

To fix this, simply replace the line mentioned above with

    try:
        header = next(results)
    except StopIteration:
        return
        ```

How to do a JOIN with the Python API?

How do I do a JOIN with the Python API?

Here are my STREAM and TABLE:

client.create_stream(table_name='creditcardfraud_source',
                     columns_type=['Id bigint', 'Timestamp varchar', 'User varchar', 'Time int', 'V1 double', 'V2 double', 'V3 double', 'V4 double', 'V5 double', 'V6 double', 'V7 double', 'V8 double', 'V9 double', 'V10 double', 'V11 double', 'V12 double', 'V13 double', 'V14 double', 'V15 double', 'V16 double', 'V17 double', 'V18 double', 'V19 double', 'V20 double', 'V21 double', 'V22 double', 'V23 double', 'V24 double', 'V25 double', 'V26 double', 'V27 double', 'V28 double', 'Amount double', 'Class string'],
                     topic='creditcardfraud_source',
                     value_format='DELIMITED')

client.create_table(table_name='users',
                     columns_type=['userid varchar', 'gender varchar', 'regionid varchar'],
                     topic='users',
                     key='userid',
                     value_format='AVRO')

Not sure how to configure the JOIN? Or is this not possible yet?

[Question] How to delete topic ?

Use kafka-topics.sh --delete to delete remove-me topic.

$ ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic remove-me
Topic remove-me is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

But how to do this using this repo?

Error with lowercase columns

the regex to parse the column names expect that they are all uppercase, but some queries and columns may be in lowercase even if it's not the default in ksqldb

regex = r"(?<!\<)`(?P<name>[A-Z_]+)` (?P<type>[A-z]+)[\<, \"](?!\>)"

Python 3.10 Incompatibility

Traceback (most recent call last):
  File "/app/consumer.py", line 17, in <module>
    import ksql
  File "/usr/local/lib/python3.10/site-packages/ksql/__init__.py", line 6, in <module>
    from ksql.client import KSQLAPI  # noqa
  File "/usr/local/lib/python3.10/site-packages/ksql/client.py", line 4, in <module>
    from ksql.api import SimplifiedAPI
  File "/usr/local/lib/python3.10/site-packages/ksql/api.py", line 12, in <module>
    from hyper import HTTPConnection
  File "/usr/local/lib/python3.10/site-packages/hyper/__init__.py", line 11, in <module>
    from .common.connection import HTTPConnection
  File "/usr/local/lib/python3.10/site-packages/hyper/common/connection.py", line 9, in <module>
    from ..http11.connection import HTTP11Connection
  File "/usr/local/lib/python3.10/site-packages/hyper/http11/connection.py", line 13, in <module>
    from collections import Iterable, Mapping
ImportError: cannot import name 'Iterable' from 'collections' (/usr/local/lib/python3.10/collections/__init__.py)

Looks like this dependency is using a deprecated API

DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.10 it will stop working

"An ImportError will be raised as collections.Iterable will be in collections.abc.Iterable"

this is coming from https://github.com/python-hyper/hyper which is deprecated so I guess it's time to switch to https://www.python-httpx.org/ or similar.

IndexError from client.ksql("SET ...")

There seems to be an error in

ksql-python/ksql/api.py

Lines 44 to 62 in 9cd96cd

def _raise_for_status(r, response):
r_json = json.loads(response)
if r.getcode() != 200:
# seems to be the new API behavior
if r_json.get("@type") == "statement_error" or r_json.get("@type") == "generic_error":
error_message = r_json["message"]
error_code = r_json["error_code"]
stackTrace = r_json["stack_trace"]
raise KSQLError(error_message, error_code, stackTrace)
else:
raise KSQLError("Unknown Error: {}".format(r.content))
else:
# seems to be the old API behavior, so some errors have status 200, bug??
if r_json and r_json[0]["@type"] == "currentStatus" and r_json[0]["commandStatus"]["status"] == "ERROR":
error_message = r_json[0]["commandStatus"]["message"]
error_code = None
stackTrace = None
raise KSQLError(error_message, error_code, stackTrace)
return True

client.ksql("SET 'ksql.query.pull.table.scan.enabled' = 'true';")

gives IndexError: list index out of range on line 57

Allow setting auto.offset.reset independent of a specific query

I would like to set

SET 'auto.offset.reset'='earliest';
SET 'auto.offset.reset'='latest';

I hoped it would work like the following with the generic 'client.ksql' command:
client.ksql('SET 'auto.offset.reset'='earliest'');

It works well from KSQL CLI, so I think it is not a KSQL issue, but a limitation in the Python wrapper?

I found this related ticket:
#39
Though, I think it is much more cumbersome if you have to add this property to every query instead of doing it just once before executing several queries afterwards.

HTTP/2 Configurable Timeout to avoid socket.timeout error

I am getting a socket timeout error on calling inserts_stream, and I am unable to increase the default socket timeout
The timeout parameter in KSQLAPI's constructor does not affect inserts_stream

timeout is used to construct a SimplifiedAPI object
This is only used in the parent class BaseApi in its _request function, which uses HTTP/1 with urllib
These HTTP/1 calls are used by the query and ksql functions

The timeout is not used by HTTP/2 calls
The HTTP/2 calls depend on Hyper, which had an issue requesting support for timeouts, but it appears it wasn't addressed before the repo was deprecated
This affects the inserts_stream function and query when use_http2=True

Due to hyper's deprecation it is advised to migrate to httpx which is discussed in #102
This was started but abandoned in #107
This was published to pypi as https://pypi.org/project/pykSQL/0.11.0/, but it doesn't work as calling inserts_stream causes a bad request from ksql saying '{"@type":"generic_error","error_code":40004,"message":"This endpoint is only available when using HTTP2"}'

If the migration to httpx is completed, then it is simple to pass the same timeout for HTTP/2 requests as httpx supports this
response = httpx.post(url, data=row, headers=headers, timeout=self.timeout)

Response of client.query RuntimeError

I'm trying to read from ksql in python with this script:

from ksql import KSQLAPI

client = KSQLAPI('http://0.0.0.0:8088',)
columns = ['id INT','name VARCHAR']
client.create_stream(table_name='students', columns_type= columns, topic='students')
query = client.query("SELECT name FROM students WHERE id = 1 ")
for student in query:
    print(student)

I was expecting a sequence of objects, as the documentation explains, instead it returns me a string representing pieces of an array of objects, with headers and at the end, a "RuntimeError: generator raised StopIteration" (each row is generated separately):

[{"header":{"queryId":"transient_STUDENTS_5788262560238090205","schema":"`NAME` STRING"}},

{"row":{"columns":["Alex"]}},

]

---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
[...]
The above exception was the direct cause of the following exception:
RuntimeError                              Traceback (most recent call last)
[...]
RuntimeError: generator raised StopIteration

This way I have to handle the exception or I have to break the loop when I find the ']' character. Also, I need to clean up each string because it is not a valid JSON (due to the final comma and new line), before deserializing it into a Python object with json.loads(str).

Incompatibility with pep517 and Installation Failure

Description:
The ksql-python package encounters compatibility issues with pep517 during installation using tools where this pep is considered the default, resulting in the following error:

❯ python -m pip wheel --use-pep517 .
Processing /home/shahin/projects/oss/ksql-python
  Installing build dependencies ... done
  Getting requirements to build wheel ... error
  error: subprocess-exited-with-error
  
  × Getting requirements to build wheel did not run successfully.
  │ exit code: 1
  ╰─> [17 lines of output]
      Traceback (most recent call last):
        File "/nix/store/dm99mzlng40fq8cccq3vdlkam98i1v6n-devenv-profile/lib/python3.10/site-packages/pip/_vendor/pyproject_hooks/_in_process/_in_process.py", line 353, in <module>
          main()
        File "/nix/store/dm99mzlng40fq8cccq3vdlkam98i1v6n-devenv-profile/lib/python3.10/site-packages/pip/_vendor/pyproject_hooks/_in_process/_in_process.py", line 335, in main
          json_out['return_val'] = hook(**hook_input['kwargs'])
        File "/nix/store/dm99mzlng40fq8cccq3vdlkam98i1v6n-devenv-profile/lib/python3.10/site-packages/pip/_vendor/pyproject_hooks/_in_process/_in_process.py", line 118, in get_requires_for_build_wheel
          return hook(config_settings)
        File "/tmp/pip-build-env-vuq_mqb9/overlay/lib/python3.10/site-packages/setuptools/build_meta.py", line 341, in get_requires_for_build_wheel
          return self._get_build_requires(config_settings, requirements=['wheel'])
        File "/tmp/pip-build-env-vuq_mqb9/overlay/lib/python3.10/site-packages/setuptools/build_meta.py", line 323, in _get_build_requires
          self.run_setup()
        File "/tmp/pip-build-env-vuq_mqb9/overlay/lib/python3.10/site-packages/setuptools/build_meta.py", line 487, in run_setup
          super(_BuildMetaLegacyBackend,
        File "/tmp/pip-build-env-vuq_mqb9/overlay/lib/python3.10/site-packages/setuptools/build_meta.py", line 338, in run_setup
          exec(code, locals())
        File "<string>", line 8, in <module>
      ModuleNotFoundError: No module named 'pip'
      [end of output]
  
  note: This error originates from a subprocess, and is likely not a problem with pip.
error: subprocess-exited-with-error

× Getting requirements to build wheel did not run successfully.
│ exit code: 1
╰─> See above for output.

note: This error originates from a subprocess, and is likely not a problem with pip.

[notice] A new release of pip is available: 23.0.1 -> 23.1.2
[notice] To update, run: pip install --upgrade pip

As a result of this issue, package managers compatible with pep517, such as poetry and pdm, are unable to install the ksql-python package, displaying the aforementioned error message.

Upon further investigation, I have identified the root cause of the problem. The setup.py file includes an unused import statement:

import pip

if LooseVersion(pip.__version__) >= "10.0.0":
    from pip._internal.req import parse_requirements
else:
    from pip.req import parse_requirements

Will open a PR to address it.

Error when using pull query

This is my first issue, so sorry if i missed something.

I have encountered an issue when reading data from a stream using a PULL Query (im new to KSQL sorry if the problem is actually the query)

I have a test stream with only 4 rows, when i try to read it with the code bellow it outputs the rows but then throws an JSONDecodeError.

def consume_stream(stream: str, n_messages: int = 10) -> Generator[dict, None, None]:
    load_dotenv(find_dotenv())
    url = os.environ["KSQL_HOST"]

    client = KSQLAPI(url)

    client.query("SET 'auto.offset.reset' = 'earliest';")
    query = f"select * from {stream}"

    return client.query(query, return_objects=True)

The traceback is bellow (i use rich hence the pretty output, i can show the raw if needed), the problem is that the row returned is empty
image

A simple check like this fixes the issue

row = row.replace(",\n", "").replace("]\n", "")
if not row:
    return None

As a note if a use client.query(query) instead (without the return_objects=True) it still raises an error, but now is StopIteration, probably for the same reason.

SELECT statements call incorrect endpoint for KSQLDB 0.19.0

Queries like:

from ksql import KSQLAPI
client = KSQLAPI('http://localhost:8088')
client.ksql('SELECT * FROM stream_name EMIT CHANGES LIMIT 5;')

don't work and throw the following error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/ksql/api.py", line 170, in _request
    r = urllib.request.urlopen(req, timeout=self.timeout)
  File "/usr/local/lib/python3.9/urllib/request.py", line 214, in urlopen
    return opener.open(url, data, timeout)
  File "/usr/local/lib/python3.9/urllib/request.py", line 523, in open
    response = meth(req, response)
  File "/usr/local/lib/python3.9/urllib/request.py", line 632, in http_response
    response = self.parent.error(
  File "/usr/local/lib/python3.9/urllib/request.py", line 561, in error
    return self._call_chain(*args)
  File "/usr/local/lib/python3.9/urllib/request.py", line 494, in _call_chain
    result = func(*args)
  File "/usr/local/lib/python3.9/urllib/request.py", line 641, in http_error_default
    raise HTTPError(req.full_url, code, msg, hdrs, fp)
urllib.error.HTTPError: HTTP Error 400: Bad Request

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.9/site-packages/ksql/client.py", line 43, in ksql
    return self.sa.ksql(ksql_string, stream_properties=stream_properties)
  File "/usr/local/lib/python3.9/site-packages/ksql/api.py", line 65, in ksql
    r = self._request(endpoint="ksql", sql_string=ksql_string, stream_properties=stream_properties)
  File "/usr/local/lib/python3.9/site-packages/ksql/api.py", line 178, in _request
    raise KSQLError(content.get("message"), content.get("error_code"), content.get("stackTrace"))
ksql.errors.KSQLError: ("The following statement types should be issued to the websocket endpoint '/query':\n\t* PRINT\n\t* SELECT", 40002, None)

P.S. Queries like

client.ksql('SHOW STREAMS;')

work fine.

socket.timeout() exception is being thrown when using 'use_http2=True'

when i use 'use_http2=True' parameter in the query function, the data is retrieved and when the kafka stream stops sending data for more than 5 secs the python code throws an exception 'socket.timeout()', but when i dont use the extra parameter in the query function the query sends data and keeps waiting for data and does not throw exception

Which KSQL version is supported?

Please explain in the readme which KSQL version is supported and tested.

Important as KSQL is evolving fast and was not backwards compatible in the past (I think this changed now with its GA release).

I wonder if it already works with KSQL 5.0 (part of Confluent Platform 5.0)?

ImportError: cannot import name 'Iterable' from 'collections'

I wrote a simple python client that connects to local ksql:

import logging
from ksql import KSQLAPI
logging.basicConfig(level=logging.DEBUG)
client = KSQLAPI('http://primary-ksqldb-server:8088') 

and i faced the following error

ImportError: cannot import name 'Iterable' from 'collections' (/opt/homebrew/Caskroom/miniconda/base/envs/mask/lib/python3.12/collections/__init__.py)

python 3.12
ksql-0.10.2

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.