timeplus-io / gluon Goto Github PK
View Code? Open in Web Editor NEWPython SDK for Timeplus REST API (not for Proton)
License: Apache License 2.0
Python SDK for Timeplus REST API (not for Proton)
License: Apache License 2.0
should consider a better way to reuse session to improve the performance
run 40 ingestions and 40 queries in parallel, following errors are reported,
2022-05-21T06:07:08.303417+0000 - ERROR - http post failed HTTPConnectionPool(host='172.31.50.206', port=8000): Max retries exceeded with url: /api/v1beta1/streams/github_6/ingest (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9e322cf430>: Failed to establish a new connection: [Errno 99] Cannot assign requested address'))
2022-05-21T06:07:08.306757+0000 - ERROR - http get failed HTTPConnectionPool(host='172.31.50.206', port=8000): Max retries exceeded with url: /api/v1beta1/queries/55ff1c73-47c9-4303-9346-c91f65054d38 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9e322d4610>: Failed to establish a new connection: [Errno 99] Cannot assign requested address'))
2022-05-21T06:07:08.309252+0000 - ERROR - http get failed HTTPConnectionPool(host='172.31.50.206', port=8000): Max retries exceeded with url: /api/v1beta1/queries/348b2a28-d182-4e6d-a9c1-118d3d59a7f2 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9e322d4340>: Failed to establish a new connection: [Errno 99] Cannot assign requested address'))
And @zliang-min already had some investigation as following:
https://stackoverflow.com/questions/30943866/requests-cannot-assign-requested-address-out-of-ports
Right, gluon is not using Session, e.g. https://github.com/timeplus-io/gluon/blob/develop/python/timeplus/env.py#L182
I think it's mostly the ingestion causing the problem in your case. I wonder if calling r.close() explicitly would help. Maybe you can try adding r.close() to the methods in the env.py file and see if that helps (but I am not sure how this would impact the performance, e.g. it might impact keep-alive).
After I upgraded to 0.2.1 and ran the same python script and got many errors:
2022-09-22T01:33:20.515584+0000 - ERROR - failed to recieve data from websocket
error Expecting value: line 1 column 1 (char 0)
2022-09-22T01:33:20.633927+0000 - ERROR - failed to recieve data from websocket
error Expecting value: line 1 column 1 (char 0)
2022-09-22T01:33:20.755501+0000 - ERROR - failed to recieve data from websocket
error Expecting value: line 1 column 1 (char 0)
with more and more interface change, using generated code is more efficient to build SDK.
when there is multiple env, the later apikey will override previous env.
support more types includes:
source = (
KafkaSource()
.name(name)
.properties(
KafkaProperties()
.topic("github_events")
.brokers(BROKER)
.sasl("plain")
.username(KAFKA_API_KEY)
.password(KAFKA_SECRET_KEY)
)
)
source.connection(SourceConnection().stream("kafka").auto_create(True))
source.create().start()
Source is created and started, but there is no stream. And there is no Neutron error
I created a stream with a single cnt
column (actually the stream is created automated with Timeplus Sink)
If I use python SDK to insert data, I cannot set the _tp_time column, otherwise it will fail
s = (
Stream().name("a")
.column(StreamColumn().name("_tp_time").type("datetime64(3, 'UTC')"))
.column(StreamColumn().name("cnt").type("uint64"))
)
s.insert([["2022-09-09T09:00:00",9595]])
Error
timeplus.error.TimeplusAPIError: http method post, response code 500, failed to send http post due to {"code":500,"message":"failed to ingest data to stream a: code: 27, message: Code: 27. DB::ParsingException: Cannot parse input: expected '"' before: '-09-09T09:00:00",9595]]': (while reading the value of key cnt): \nRow 1:\nColumn 0, name: cnt, type: uint64, ERROR: text "[\u003cDOUBLE QUOTE\u003e2022-09-" is not like uint64\n\n: (at row 1)\n. (CANNOT_PARSE_INPUT_ASSERTION_FAILED) (version 1.0.82)"}
If I remove the _tp_time, then data can be inserted
s = (
Stream().name("a")
.column(StreamColumn().name("cnt").type("uint64"))
)
s.insert([[950]])
In my case, the stream is created with SQL, not via the python code
s = (
Stream()
.name("github_events")
)
When I insert data, the console output is
post http://localhost:8000/api/v1beta1/streams/github_events/ingest
insert {'columns': [], 'data': [['id', '20835192348'], ['created_at', '2022-03-20T04:03:40'], ['type', 'CreateEvent'], ['repo', 'Bobsang132/GIT-BASH'], ['payload', {'ref': 'main', 'ref_type': 'branch', 'master_branch': 'main', 'description': None, 'pusher_type': 'user'}]]}
insert success
But actually no data is inserted. It seems that I need to declare the columns even the stream is created elsewhere. A bit anti-intuition, since the column names are already set in insert method
Now the query result is from WS where the result is all string type, it is better to convert the string type to the real time according to https://github.com/timeplus-io/docs/blob/main/docs/datatypes.md
I got a python datetime object, and wanted to insert data via
s.insert([ ["id",e.id],["created_at",e.created_at],..)
Got error message
failed to insert Object of type datetime is not JSON serializable
Workaround is to call e.created_at.isoformat()
Ideally the python SDK can accept native datetime objects
(gluon) jameshao@JamesdeMacBook-Pro python % python3 -m build
for a soution, the user might create:
the solutino app resource loader is an application that will create these resources using a configuration file. (json, yaml, conf)
also need document explain how to use environment
swagger_client.rest.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Date': 'Thu, 22 Jun 2023 20:11:26 GMT', 'Content-Type': 'application/json; charset=utf-8', 'Content-Length': '111', 'Connection': 'keep-alive', 'Access-Control-Allow-Credentials': 'true', 'Access-Control-Allow-Headers': 'Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With', 'Access-Control-Allow-Methods': 'POST, HEAD,PATCH, DELETE, OPTIONS, GET, PUT', 'Access-Control-Allow-Origin': '*', 'X-Request-Id': 'H0eB3RGybTYSUHAuH1TP4'})
HTTP response body: b'{"code":400,"message":"bad event: json: cannot unmarshal string into Go value of type map[string]interface {}"}'
add json
datatype support
Today if the neutron gets restarted, the streaming sql will be cancelled.
For the Python SDK client, it'll be great if the query can be resubmitted, or let the client to decide how to resubmit.
Today when the client uses Python SDK to run a streaming query and neutron gets restarted, the query just hangs
today, the Python SDK just shows many log message with print
method.
It's overwhelming when inserts a lot of data
As title.
The Python SDK should be supporting both single-tenant and multi-tenant deployment
switch to API key instead
The README of the Python SDK shows Environment().address(api_address).workspace(workspace).apikey(api_key)
This is confusing for new users. They don't know what is api_address
and what is the workspace
Most of our users are using https://us.timeplus.cloud, so this should be the default value
api_key = os.environ.get("TIMEPLUS_API_KEY") #60 characters
workspace = os.environ.get("TIMEPLUS_WORKSPACE") #8 characters, such as weioctg9, instead of MyWorkspaceName
env = Environment().workspace(workspace).apikey(api_key)
# if you are not using https://us.timeplus.cloud, call address(..), such as
# Environment().address("https://my.timeplus.cloud")
Add check in workspace() to check whether the input is 8 char, if not, show an error e,g "Did you set the workspace name? Please set the workspace ID (usually 8 characters long)"
Add check in apikey() to check whether the input is 60 char, if not, show an error e,g "The apikey should be 60 characters"
somehow similar to #3
I got a dict
from python API and insert the column via ["jsonColumn",jsonValue]
It generates JSON to REST API like ['payload', {'push_id': 93968
I got insert success
but the data is actually not inserted.
I guess I need to turn the JSON object to a string, e.g.
import json
["payload",json.dumps(e.payload)]
https://ibis-project.org/posts/ibis-version-8.0.0-release/ new ibis release start to support streaming, this is a good candidate to integrate with python community which we should support
latest version today is 1.3.0b2, b means beta. We should release official version before Sep15. Soft GA was early Aug.
E.g. I'd like to use
GeneratorField().name("device").type("string").range(["device_0", "device_1", "device_2"])
However, only limit
is supposed
a file loader as an example of gluon
add interface to store metrics to timeplus
I'd like to use gluon to create, list, delete and update a view or materialized view
support topology
https://docs.timeplus.com/rest.html#tag/Topology-v1beta2
And when working with other community tools such as LangChain, Querybook, Superset, SQLAlchemy plays as the driver role.
It is better that Timeplus python sdk can support it.
s.create() exception, error = 'api_key_id'
my code thrown out exception:
def create_stream(stream_name, tp_schema, tp_host, tp_port, client_id,client_secret):
env = (
Env()
.schema(tp_schema).host(tp_host).port(tp_port)
#.login(client_id=client_id, client_secret=client_secret)
)
Env.setCurrent(env)
try:
s = (
Stream()
.name(stream_name)
.column(StreamColumn().name("id").type("string"))
.column(StreamColumn().name("created_at").type("datetime"))
.column(StreamColumn().name("actor").type("string"))
.column(StreamColumn().name("type").type("string"))
.column(StreamColumn().name("repo").type("string"))
.column(StreamColumn().name("payload").type("string"))
.ttl_expression("created_at + INTERVAL 2 WEEK")
)
if(s.get() is None):
s.create()
print(f"Created a new stream {s.name()}")
return stream_name
except Exception as e:
print(f"s.create() exception, error = {e}")
sys.exit(f"Failed to list or create data streams from {tp_schema}://{tp_host}:{tp_port}. Please make sure you are connecting to the right server.")
.neutron.yaml:
enable-authentication: false
# uncomment/enable the following setting for playground
#frontend-applications: playground
#proton-username: demo
#proton-password: demo@t+
#proton-query-black-list:
# - insert
# - drop
# - system.
# - create
#proton-query-timeout: 300
# except playground, other deployments need thoses auth0
auth0-domain: timeplus.us.auth0.com
auth0-callback-url: https://FIXME/callback
auth0-client-id:
auth0-client-secret:
auth0-audience: https://FIXME.beta.timeplus.com/api/v1beta1
auth0-api-client-id:
auth0-api-client-secret:
~
~
checked neutron.log by tail-f, no error or new log emit when 'api_key_id' exception is caught.
DDL
CREATE STREAM github_events(
id string,
created_at string,
type string,
repo string,
_tp_time datetime64 default to_datetime(created_at)
)
Python
s.insert([
["id",e.id],["created_at",e.created_at.isoformat()],["type",e.type],["repo",e.repo.name]#,["payload",json.dumps(e.payload)]
])
Error
post http://localhost:8000/api/v1beta1/streams/github_events/ingest
insert {'columns': ['id', 'created_at', 'type', 'repo'], 'data': [['id', '20839378587'], ['created_at', '2022-03-20T18:07:17'], ['type', 'PushEvent'], ['repo', 'b-tekinli/b-tekinli']]}
failed to insert 500 {"code":0,"message":"failed to ingest data to stream github_events: request failed with status code 400, response body {"code":27,"error_msg":"Code: 27. DB::ParsingException: Cannot parse input: expected ',' before: '],[\"created_at\",\"2022-03-20T18:07:17\"],[\"type\",\"PushEvent\"],[\"repo\",\"b-tekinli\/b-tekinli\"]]': \nRow 1:\nColumn 0, name: id, type: string, parsed text: \u003cEMPTY\u003eERROR\nCode: 26. DB::ParsingException: Cannot parse JSON string: expected opening quote: (while reading the value of key id). (CANNOT_PARSE_QUOTED_STRING) (version 1.0.20)\n\n: While executing ParallelParsingBlockInputFormat: (at row 1)\n. (CANNOT_PARSE_INPUT_ASSERTION_FAILED) (version 1.0.20)","request_id":""}\n"}
the replacement does not work which will call '/v1beta2/streams/:name'
to get stream
this issue impact both stream and view object
the current workaround is use list and then filter out which works.
In my test, with car_live_data, the console query will reach eps about 700, while using python sdk, it can only achieve 7-10 eps. which is a 100 times slower.
Need to dig out the performance bottle neck here.
support sink and source resource management
in the 0.1.1 gluon, for any HTTP request, if the token is invalid, the sdk will try to refresh the token, with at most 24 times.
However if the first token is invalid, I guess we can skip the token refresh, and show the error to the end user that the token is not valid.
The main use case is the user gets an valid token and need to run the script for a long time. We should auto-refresh the token so that the user doesn't need to stop the script and rerun. But if at the beginning the token is invalid, this is considered as misconfig.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.