Code Monkey home page Code Monkey logo

prefect-sqlalchemy's Introduction

Note

Active development of this project has moved within PrefectHQ/prefect. The code can be found here and documentation here. Please open issues and PRs against PrefectHQ/prefect instead of this repository.

prefect-sqlalchemy

PyPI

Visit the full docs here to see additional examples and the API reference.

The prefect-sqlalchemy collection makes it easy to connect to a database in your Prefect flows. Check out the examples below to get started!

Getting started

Integrate with Prefect flows

Prefect and SQLAlchemy are a data powerhouse duo. With Prefect, your workflows are orchestratable and observable, and with SQLAlchemy, your databases are a snap to handle! Get ready to experience the ultimate data "flow-chemistry"!

To set up a table, use the execute and execute_many methods. Then, use the fetch_many method to retrieve data in a stream until there's no more data.

By using the SqlAlchemyConnector as a context manager, you can make sure that the SQLAlchemy engine and any connected resources are closed properly after you're done with them.

Be sure to install prefect-sqlalchemy and save your credentials to a Prefect block to run the examples below!

!!! note "Async support"

`SqlAlchemyConnector` also supports async workflows! Just be sure to save, load, and use an async driver as in the example below.

```python
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, AsyncDriver

connector = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
        driver=AsyncDriver.SQLITE_AIOSQLITE,
        database="DATABASE-PLACEHOLDER.db"
    )
)

connector.save("BLOCK_NAME-PLACEHOLDER")
```

=== "Sync"

```python
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector


@task
def setup_table(block_name: str) -> None:
    with SqlAlchemyConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            parameters={"name": "Marvin", "address": "Highway 42"},
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Highway 42"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    all_rows = []
    with SqlAlchemyConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
def sqlalchemy_flow(block_name: str) -> list:
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows


sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER")
```

=== "Async"

```python
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
import asyncio

@task
async def setup_table(block_name: str) -> None:
    async with await SqlAlchemyConnector.load(block_name) as connector:
        await connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await connector.execute(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            parameters={"name": "Marvin", "address": "Highway 42"},
        )
        await connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Highway 42"},
            ],
        )

@task
async def fetch_data(block_name: str) -> list:
    all_rows = []
    async with await SqlAlchemyConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = await connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
async def sqlalchemy_flow(block_name: str) -> list:
    await setup_table(block_name)
    all_rows = await fetch_data(block_name)
    return all_rows


asyncio.run(sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER"))
```

Resources

For more tips on how to use tasks and flows provided in a Prefect integration library, check out the Prefect docs on using integrations.

Installation

Install prefect-sqlalchemy with pip:

pip install prefect-sqlalchemy

Requires an installation of Python 3.8 or higher.

We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.

The tasks in this library are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

Saving credentials to a block

To use the load method on Blocks, you must have a block document saved through code or saved through the UI.

Below is a walkthrough on saving block documents through code; simply create a short script, replacing the placeholders.

from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver

connector = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
        driver=SyncDriver.POSTGRESQL_PSYCOPG2,
        username="USERNAME-PLACEHOLDER",
        password="PASSWORD-PLACEHOLDER",
        host="localhost",
        port=5432,
        database="DATABASE-PLACEHOLDER",
    )
)

connector.save("BLOCK_NAME-PLACEHOLDER")

Congrats! You can now easily load the saved block, which holds your credentials:

from prefect_sqlalchemy import SqlAlchemyConnector

SqlAlchemyConnector.load("BLOCK_NAME-PLACEHOLDER")

The required keywords depend upon the desired driver. For example, SQLite requires only the driver and database arguments:

from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver

connector = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
        driver=SyncDriver.SQLITE_PYSQLITE,
        database="DATABASE-PLACEHOLDER.db"
    )
)

connector.save("BLOCK_NAME-PLACEHOLDER")

!!! info "Registering blocks"

Register blocks in this module to
[view and edit them](https://orion-docs.prefect.io/ui/blocks/)
on Prefect Cloud:

```bash
prefect block register -m prefect_sqlalchemy
```

A list of available blocks in prefect-sqlalchemy and their setup instructions can be found here.

Feedback

If you encounter any bugs while using prefect-sqlalchemy, please open an issue in the prefect-sqlalchemy repository.

If you have any questions or issues while using prefect-sqlalchemy, you can find help in the Prefect Community Slack .

Feel free to star or watch prefect-sqlalchemy for updates!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-sqlalchemy, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
pre-commit install
  1. git commit, git push, and create a pull request

prefect-sqlalchemy's People

Contributors

ahuang11 avatar chrisguidry avatar dependabot[bot] avatar desertaxle avatar discdiver avatar mastertilla avatar obendidi avatar prefect-collection-synchronizer[bot] avatar urimandujano avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-sqlalchemy's Issues

cursor connection closed before reading result with SyncDriver.MSSQL_PYODBC

When using SyncDriver.MYSQL_PYODBC to connect to a SQL Server using DatabaseCredentials
result = sqlalchemy_query( "SELECT * FROM dbo.patients", sqlalchemy_credentials, )
the following error is returned

  File "/Users/ge-hall/Library/Python/3.8/lib/python/site-packages/prefect/engine.py", line 1041, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/Users/ge-hall/Library/Python/3.8/lib/python/site-packages/prefect_sqlalchemy/database.py", line 138, in sqlalchemy_query
    return result.fetchall()
  File "/Users/ge-hall/Library/Python/3.8/lib/python/site-packages/sqlalchemy/engine/result.py", line 1024, in fetchall
    return self._allrows()
  File "/Users/ge-hall/Library/Python/3.8/lib/python/site-packages/sqlalchemy/engine/result.py", line 401, in _allrows
    rows = self._fetchall_impl()
  File "/Users/ge-hall/Library/Python/3.8/lib/python/site-packages/sqlalchemy/engine/cursor.py", line 1804, in _fetchall_impl
    return self.cursor_strategy.fetchall(self, self.cursor)
  File "/Users/ge-hall/Library/Python/3.8/lib/python/site-packages/sqlalchemy/engine/cursor.py", line 981, in fetchall
    self.handle_exception(result, dbapi_cursor, e)
  File "/Users/ge-hall/Library/Python/3.8/lib/python/site-packages/sqlalchemy/engine/cursor.py", line 941, in handle_exception
    result.connection._handle_dbapi_exception(
  File "/Users/ge-hall/Library/Python/3.8/lib/python/site-packages/sqlalchemy/engine/base.py", line 2043, in _handle_dbapi_exception
    util.raise_(
  File "/Users/ge-hall/Library/Python/3.8/lib/python/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/ge-hall/Library/Python/3.8/lib/python/site-packages/sqlalchemy/engine/cursor.py", line 977, in fetchall
    rows = dbapi_cursor.fetchall()
sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) The cursor's connection has been closed.

Not sure if this is due to server side cursors with SQL Server but the cause appears to be due to engine.dispose() in database.py _execute killing the connection pool before consuming the results.

This simple PoC reproduces the same error

  from sqlalchemy import create_engine
  import sqlalchemy
  
  def connect():
      engine = create_engine(
                  "mssql+pyodbc://username:password"
                  "@portainer:1433/HealthDB?driver=ODBC+Driver+17+for+SQL+Server",
                  echo=True,
                  )
  
      try:
          with engine.connect() as conn:
  
              result = conn.execute("SELECT * FROM dbo.patients")
      except Exception as e:
          print(e)
      finally:
          engine.dispose()
      return result
  
  res = connect()
  
  for row in res:
      print(row)

Add collection sync workflow using cruft

Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.

Password visible in block configuration

When using the new SQLAlchemyConnector to create a block configuration the password is revealed in clear text in the front end:

image

This causes a serious security flaw.

Json fields confusion when editing credentials blocks with orion-ui

Since the 0.1.1 update now allows DatabaseCredentials to be saved, loaded and edited like any other block, including in the UI (which is going to be an immensely convenient feature), I've tried using the web view to create new credentials and the editor appears to have a few quirks.

The interface appears to be a bit confused regarding what to do with the driver and url fields, which are both unions of strings with other types, and that seems to make whatever form generator orion-ui is using fall back to showing a json value editor (even though only query and connect_args should have those). The frontend will not accept the URL unless it can be parsed into a json value (meaning the string must be quoted), but the backend will understand that as the URL itself containing quotation marks.

Also, whenever one of the 4 JSON fields are edited, connect_args is altered to mirror whichever field was last edited, and the interface will not allow users to leave it empty. The validation for all these fields is also only affected by the validity of the last one, so they are all considered valid by the frontend (not the backend) so long as connect_args contains something like {}.

I know this isn't a prefect-sqlalchemy bug per se, but it affect its functionnality. I will be filing a bug in the main Prefect repo as well.

sqlalchemy_query `limit` parameter is confusing

Description

The current limit parameter description is

limit: The number of rows to fetch.

As a result calling

sqlalchemy_query(
    "SELECT * FROM customers WHERE name = :name;",
    sqlalchemy_credentials,
    params={"name": "Marvin"},
    limit=3
)

might be interpreted as running something like

SELECT * 
FROM customers 
WHERE name = Marvin;
LIMIT 3

where in reality it seems to be running the full select and then limiting the number of returned rows programmatically.

SELECT * 
FROM customers 
WHERE name = Marvin;

Expected Behavior

Providing a value for limit should limit the query at the database level and not at the client level

OR

The function should not accept a limit value and push the limit / offset behaviour in the query itself

(I can submit a pull request if you think this should/could be fixed)

Reproduction

I added the logging behaviour in SQLAlchemy to automatically log the ran query

import logging
from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver
from prefect_sqlalchemy.database import sqlalchemy_query
from prefect import flow

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

@flow
def sqlalchemy_query_flow():

    sqlalchemy_credentials = DatabaseCredentials(
        driver=AsyncDriver.POSTGRESQL_ASYNCPG,
        username="prefect",
        password="prefect_password",
        database="postgres",
    )

    result = sqlalchemy_query(
        "SELECT * FROM customers WHERE name = :name;",
        sqlalchemy_credentials,
        params={"name": "Marvin"},
        limit=3
    )
    return result

sqlalchemy_query_flow()

will log

12:13:14.029 | INFO    | prefect.engine - Created flow run 'tough-donkey' for flow 'sqlalchemy-query-flow'
12:13:14.031 | INFO    | Flow run 'tough-donkey' - Using task runner 'ConcurrentTaskRunner'
12:13:14.207 | INFO    | Flow run 'tough-donkey' - Created task run 'sqlalchemy_query-ff7683f9-0' for task 'sqlalchemy_query'
12:13:14.277 | INFO    | sqlalchemy.engine.Engine - select pg_catalog.version()
12:13:14.278 | INFO    | sqlalchemy.engine.Engine - [raw sql] ()
12:13:14.285 | INFO    | sqlalchemy.engine.Engine - select current_schema()
12:13:14.286 | INFO    | sqlalchemy.engine.Engine - [raw sql] ()
12:13:14.296 | INFO    | sqlalchemy.engine.Engine - show standard_conforming_strings
12:13:14.298 | INFO    | sqlalchemy.engine.Engine - [raw sql] ()
12:13:14.306 | INFO    | sqlalchemy.engine.Engine - BEGIN (implicit)
12:13:14.307 | INFO    | sqlalchemy.engine.Engine - SELECT * FROM customers WHERE name = marvin
12:13:14.308 | INFO    | sqlalchemy.engine.Engine - [generated in 0.00128s] ()
12:13:20.583 | INFO    | sqlalchemy.engine.Engine - COMMIT
12:13:20.892 | INFO    | Task run 'sqlalchemy_query-ff7683f9-0' - Finished in state Completed()
12:13:20.921 | INFO    | Flow run 'tough-donkey' - Finished in state Completed('All states completed.')

Environment

Python 3.9.12

Prefect

Version:             2.0b8
API version:         0.7.0
Python version:      3.9.12
Git commit:          4b8dfc35
Built:               Fri, Jul 8, 2022 8:53 AM
OS/Arch:             darwin/x86_64
Profile:             default
Server type:         hosted

prefect-sqlalchemy==0.1.0

SqlAlchemyConnector.fetch_all throws exception

A sample illustrates the problem:

with SqlAlchemyConnector(connection_info=ConnectionComponents(<stuff goes here>)) as connector:
    connector.fetch_all("select * from mytable")

This fails since 0.2.3 because that version introduced lazy loading of the engine. In the PR46 you can see that self._unique_results = {} has been moved into get_engine. However, when calling fetch_all the validation of the input_hash fails because it is executed before get_engine has been called. Therefore self._unique_results is still uninitialized.

Expectation / Proposal

Move the initialization of self._unique_results back to block_initialization.

Traceback / Example

Error when registering

I am getting an error when attempting to register this modules credentials as a Block in my Orion server.

Prefect 2.2.0
Prefect-sqlalchemy 0.1.2

See logs:

root@workflow:/opt/workflow# prefect block register --module prefect_sqlalchemy.credentials
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/prefect/cli/_utilities.py", line 41, in wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/usr/local/lib/python3.9/dist-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.9/dist-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.9/dist-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.9/dist-packages/prefect/cli/block.py", line 118, in register
    imported_module = import_module(name=module_name)
  File "/usr/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 972, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 790, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/usr/local/lib/python3.9/dist-packages/prefect_sqlalchemy/__init__.py", line 2, in <module>
    from .credentials import DatabaseCredentials, AsyncDriver, SyncDriver  # noqa
  File "/usr/local/lib/python3.9/dist-packages/prefect_sqlalchemy/credentials.py", line 91, in <module>
    class DatabaseCredentials(Block):
  File "pydantic/main.py", line 283, in pydantic.main.ModelMetaclass.__new__
  File "/usr/lib/python3.9/abc.py", line 85, in __new__
    cls = super().__new__(mcls, name, bases, namespace, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/dispatch.py", line 99, in _register_subclass_of_base_type
    register_type(cls)
  File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/dispatch.py", line 154, in register_type
    key = get_dispatch_key(cls)
  File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/dispatch.py", line 76, in get_dispatch_key
    dispatch_key = dispatch_key()
  File "/usr/local/lib/python3.9/dist-packages/prefect/blocks/core.py", line 199, in __dispatch_key__
    return block_schema_to_key(cls._to_block_schema())
  File "/usr/local/lib/python3.9/dist-packages/prefect/blocks/core.py", line 346, in _to_block_schema
    fields = cls.schema()
  File "pydantic/main.py", line 664, in pydantic.main.BaseModel.schema
  File "pydantic/schema.py", line 186, in pydantic.schema.model_schema
  File "pydantic/schema.py", line 580, in pydantic.schema.model_process_schema
  File "pydantic/schema.py", line 621, in pydantic.schema.model_type_schema
  File "pydantic/schema.py", line 254, in pydantic.schema.field_schema
  File "pydantic/schema.py", line 526, in pydantic.schema.field_type_schema
  File "pydantic/schema.py", line 847, in pydantic.schema.field_singleton_schema
  File "pydantic/schema.py", line 742, in pydantic.schema.field_singleton_sub_fields_schema
  File "pydantic/schema.py", line 526, in pydantic.schema.field_type_schema
  File "pydantic/schema.py", line 889, in pydantic.schema.field_singleton_schema
  File "pydantic/schema.py", line 594, in pydantic.schema.model_process_schema
  File "/usr/local/lib/python3.9/dist-packages/prefect/blocks/core.py", line 118, in schema_extra
    schema["block_type_slug"] = model.get_block_type_slug()
AttributeError: type object 'URL' has no attribute 'get_block_type_slug'
An exception occurred.

Update tests for SQLAlchemy 2

Expectation / Proposal

The tests have started failing since SQLAlchmey 2 was released. We don't expect any functionality changes due to SQLAlchemy 2, but we should update the tests.

Traceback / Example

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.