Code Monkey home page Code Monkey logo

prefect-snowflake's Introduction

Note

Active development of this project has moved within PrefectHQ/prefect. The code can be found here and documentation here. Please open issues and PRs against PrefectHQ/prefect instead of this repository.

prefect-snowflake

PyPI

Visit the full docs here to see additional examples and the API reference.

Welcome!

The prefect-snowflake collection makes it easy to connect to a Snowflake database in your Prefect flows. Check out the examples below to get started!

Getting Started

Integrate with Prefect flows

Prefect works with Snowflake by providing dataflow automation for faster, more efficient data pipeline creation, execution, and monitoring.

This results in reduced errors, increased confidence in your data, and ultimately, faster insights.

To set up a table, use the execute and execute_many methods. Then, use the fetch_many method to retrieve data in a stream until there's no more data.

By using the SnowflakeConnector as a context manager, you can make sure that the Snowflake connection and cursors are closed properly after you're done with them.

Be sure to install prefect-snowflake and save to block to run the examples below!

=== "Sync"

from prefect import flow, task
from prefect_snowflake import SnowflakeConnector


@task
def setup_table(block_name: str) -> None:
    with SnowflakeConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    all_rows = []
    with SnowflakeConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
def snowflake_flow(block_name: str) -> list:
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows

snowflake_flow()

=== "Async"

from prefect import flow, task
from prefect_snowflake import SnowflakeConnector
import asyncio

@task
async def setup_table(block_name: str) -> None:
    with await SnowflakeConnector.load(block_name) as connector:
        await connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

@task
async def fetch_data(block_name: str) -> list:
    all_rows = []
    with await SnowflakeConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = await connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
async def snowflake_flow(block_name: str) -> list:
    await setup_table(block_name)
    all_rows = await fetch_data(block_name)
    return all_rows

asyncio.run(snowflake_flow("example"))

Access underlying Snowflake connection

If the native methods of the block don't meet your requirements, don't worry.

You have the option to access the underlying Snowflake connection and utilize its built-in methods as well.

import pandas as pd
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector
from snowflake.connector.pandas_tools import write_pandas

@flow
def snowflake_write_pandas_flow():
    connector = SnowflakeConnector.load("my-block")
    with connector.get_connection() as connection:
        table_name = "TABLE_NAME"
        ddl = "NAME STRING, NUMBER INT"
        statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
        with connection.cursor() as cursor:
            cursor.execute(statement)

        # case sensitivity matters here!
        df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])
        success, num_chunks, num_rows, _ = write_pandas(
            conn=connection,
            df=df,
            table_name=table_name,
            database=snowflake_connector.database,
            schema=snowflake_connector.schema_  # note the "_" suffix
        )

Resources

For more tips on how to use tasks and flows in an integration, check out Using Collections!

Installation

Install prefect-snowflake with pip:

pip install prefect-snowflake

A list of available blocks in prefect-snowflake and their setup instructions can be found here.

Requires an installation of Python 3.8+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

Saving credentials to block

Note, to use the load method on Blocks, you must already have a block document saved through code or saved through the UI.

Below is a walkthrough on saving a SnowflakeCredentials block through code.

  1. Head over to https://app.snowflake.com/.
  2. Login to your Snowflake account, e.g. nh12345.us-east-2.aws, with your username and password.
  3. Use those credentials to fill replace the placeholders below.
from prefect_snowflake import SnowflakeCredentials

credentials = SnowflakeCredentials(
    account="ACCOUNT-PLACEHOLDER",  # resembles nh12345.us-east-2.aws
    user="USER-PLACEHOLDER",
    password="PASSWORD-PLACEHOLDER"
)
credentials.save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

Then, to create a SnowflakeConnector block:

  1. After logging in, click on any worksheet.
  2. On the left side, select a database and schema.
  3. On the top right, select a warehouse.
  4. Create a short script, replacing the placeholders below.
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector

credentials = SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

connector = SnowflakeConnector(
    credentials=credentials,
    database="DATABASE-PLACEHOLDER",
    schema="SCHEMA-PLACEHOLDER",
    warehouse="COMPUTE_WH",
)
connector.save("CONNECTOR-BLOCK-NAME-PLACEHOLDER")

Congrats! You can now easily load the saved block, which holds your credentials and connection info:

from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector

SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
SnowflakeConnector.load("CONNECTOR-BLOCK-NAME-PLACEHOLDER")

!!! info "Registering blocks"

Register blocks in this module to view and edit them on Prefect Cloud:

prefect block register -m prefect_snowflake

A list of available blocks in prefect-snowflake and their setup instructions can be found here.

Feedback

If you encounter any bugs while using prefect-snowflake, feel free to open an issue in the prefect-snowflake repository.

If you have any questions or issues while using prefect-snowflake, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-snowflake for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-snowflake, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
pre-commit install
  1. git commit, git push, and create a pull request

prefect-snowflake's People

Contributors

ahuang11 avatar broepke avatar chrisguidry avatar dependabot[bot] avatar desertaxle avatar discdiver avatar fraznist avatar garyranson-bl avatar johnlemmonmedely avatar peracto avatar prefect-collection-synchronizer[bot] avatar sam-phinizy avatar thesubneo avatar urimandujano avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-snowflake's Issues

Can't choose what kind of cursor to use with snowflake

Expectation / Proposal

When running a query I wasn't able to figure out how to specify which kind of cursor I wanted to use with Snowflake.

Traceback / Example

Ideally would be something like this

connect.fetch_all("SELECT * FROM TABLE",cursor_class=DictCursor

leverage executemany to stream data

In using the current main branch version of the library and leveraging the basic example of creating a credentials and connection object, when the query execution is running a long running query and the flow run gets canceled via
Command + C, the snowflake query is not getting canceled.

snowflake_credentials = SnowflakeCredentials(
        account="account",
        user="login",
        autocommit=True,
        password=os.getenv("PRIVATE_KEY_PASSPHRASE"),
        private_key="private key string",
        role="sysadmin",
    )
    snowflake_connector = SnowflakeConnector(
        database="development",
        warehouse="compute_wh",
        schema="braun",
        credentials=snowflake_credentials,
    )
    file_counter = 0
    with snowflake_connector.get_connection() as sf:
        with sf.cursor() as cur:
            cur.execute("long running query")

I would expect this to behave the same way as using the snowflake connector directly.

Issues deleting from or inserting rows from temp table using SnowflakeConnector

Hi,

The process in which I load data from my ELT's is typically as follows- I have a DataFrame or a CSV that I want to load into a snowflake table. I execute a query to create a temporary table to stage my data. I then write my DataFrame to the temporary table and execute a query to remove duplicate rows from the temporary table. From there, I insert the "deduplicated" temporary table into the original source table.

I've been able to verify that the data within my DataFrames is correct. Additionally, the temporary tables are populating correctly. However, the commands to execute the deduplication and insert steps do not seem to be working as intended. Both the insert and delete row counts are returning as 0 rows, despite there being duplicate rows that should be deleted, and missing rows that should be inserted.

@task(retries=3)
def upload(index: Index, esc): #esc is the external sources snowflake connector
    name = index.name.lower()
    pks = index.PKs
    logger = get_run_logger()
    if index.dedup=='reg':
        df = index.df.replace(',','', regex=True)
        with esc.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(f"use schema {esc.database}.{esc.schema_};")
                cur.execute(
                    f"""CREATE OR REPLACE
                    TEMPORARY TABLE TEMP_{name} LIKE {name};"""
                )
                logger.info(f"temp_{name} created")
                success, num_chunks, num_rows, _ = write_pandas(
                    conn=conn,
                    df=df,
                    table_name=f"TEMP_{name}",
                    database=esc.database,
                    schema=esc.schema_  # note the "_" suffix
                )
                logger.info(f"==> TEMPORARY TABLE TEMP_{name} LOADED")
                logger.info(num_rows)
                delete_keys = f"WHERE " + " AND ".join(
                    f"TEMP_{name}.%s = {name}.%s" % (pk, pk) for pk in pks
                )
                delete_command = f"""DELETE FROM TEMP_{name} USING {name}
                        {delete_keys};"""
                deletes = cur.execute(delete_command)
                logger.info(f"{deletes.rowcount} rows removed from {name}")
                
                inserts = cur.execute(
                    f"""INSERT INTO {name}
                        SELECT * FROM TEMP_{name};"""
                )
                logger.info(f"{inserts.rowcount} rows inserted into {name}")

Password is revealed in the connection_params when logging

I thought that by using a block that the password would not get exposed in any logs..

Which seems to work in part but not for the connect_params as shown below:

account='my_account' user='PREFECT' password=SecretStr('**********') database='my_database' 
warehouse='DEVELOPING' private_key=None authenticator=None token=None schema_='my_schema' 
role='SYSADMIN' autocommit=None connect_params={'account': 'my_account', 'user': 'PREFECT', 'password': 
'<password_gets_exposed_here>', 'database': 'my_database', 'warehouse': 'DEVELOPING', 'schema': 'my_schema', 
'role': 'SYSADMIN', 'application': 'Prefect_Snowflake_Collection'} _block_document_id=UUID('a0579d66-482e-41f8- 
bbb4-f9a8158ff3bd') _block_document_name='snowflake-conn' _is_anonymous=False

That is to say

Given:

 conn = await SnowflakeCredentials.load("snowflake-conn")
logger.info(conn)

Any password held in the block is exposed in the connection_params dict.

Receiving AttributeError: __enter__ when running prefect-snowflake example code

Expectation / Proposal

When running the following code in prefect-snowflake examples I receive the below error

from prefect import flow, task
from prefect_snowflake import SnowflakeConnector


@task
def setup_table(block_name: str) -> None:
    with SnowflakeConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    all_rows = []
    with SnowflakeConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
def snowflake_flow(block_name: str) -> list:
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows

snowflake_flow()

Traceback / Example

Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/anandbhakta/python/data-warehouse/Prefect/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1247, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/anandbhakta/python/data-warehouse/Prefect/.venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/anandbhakta/python/data-warehouse/Prefect/.venv/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/anandbhakta/python/data-warehouse/Prefect/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/anandbhakta/python/data-warehouse/Prefect/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/anandbhakta/python/data-warehouse/Prefect/viator_weekly.py", line 72, in setup_table
    with SnowflakeConnector.load(block_name) as connector:
AttributeError: __enter__

Connect params are overwritten when calling snowflake_connector.get_connection

I'm trying to run a transaction with the connection information in the snowflake_connector but my autocommit param is overwritten to None.

with snowflake_connector.get_connection(autocommit=False) as conn:

Expectation / Proposal

The resulting conn object should have set autocommit=False.

It looks like the connect params are combined with some defaults but the overrides I set are given the least precedence by being added to the dictionary first.

Traceback / Example

I'll have a PR up soon, I believe it's a simple fix.

add a poll_frequency_seconds to snowflake snowflake_query and snowflake_multiquery

I have a problem with prefect-snowflake and/or ConcurrentTaskRunner and/or asyncio:
I’m trying to run ~10 snowflake queries simultaneously.
And only 4 or 5 or 6 are picked, the rest queries run when some previous query finishes.
It looks that the threads do not “go to sleep” as intended when they wait for snowflake results and because of that other tasks are not picked.

There is a

asyncio.sleep(0.05)

and I believe that when processor is a bit busy, those “mini sleeps” (0.05s) are not really happening, and other threads cannot start…
When I change 0.05 to for example 1s, I am able to run 10 queries concurrently

Add a task to run PUT file commands

Problem

Running commands such as put file:///data.csv @my_csv_stage is a common (and efficient) way of loading data into a staging area in Snowflake. This doesn't work using the query task since this doesn't seem to be supported with the Snowflake connector for Python, but rather only from SnowSQL.

Desired behavior

Investigate and if possible, add a task to enable loading data from files into Snowflake via PUT file command.

Discussion on Slack

View in #prefect-community on Slack

Todd_de_Quincey @Todd_de_Quincey: Prefect 2.0 - Snowflake PUT command from local file error
When attempting to run the below task

result = snowflake_query(
        query="put file:///data.csv @my_csv_stage",
        snowflake_credentials=SNOWFLAKE_CREDENTIALS,
)

I get the following error

11:40:45.794 | ERROR   | Task run 'snowflake_query-eb69c8ef-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/prefect/engine.py", line 796, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/prefect_snowflake/database.py", line 68, in snowflake_query
    response = cursor.execute_async(query, params=params)
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 804, in execute_async
    return self.execute(*args, **kwargs)
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 755, in execute
    sf_file_transfer_agent.execute()
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/snowflake/connector/file_transfer_agent.py", line 355, in execute
    self._parse_command()
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/snowflake/connector/file_transfer_agent.py", line 852, in _parse_command
    response = self._ret["data"]
KeyError: 'command'

Setting a breakpoint at /Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/snowflake/connector/file_transfer_agent.py::854 and inspecting the response = _self_._ret["data"] object, there is indeed no command object in there.

Not sure if this is a user error, but haven’t been able to find much in the way of docs or examples for this.

Note: I have manually checked in snowflake that the user and role has appropriate access to the stage. And inspecting the os path, the file is accessible from the code.

Anna_Geller @Anna_Geller: Could you share the full flow code showing how you use it in your flow? Hide any credentials

Todd_de_Quincey @Todd_de_Quincey: That is the full flow at this stage - I stripped everything out to isolate the problem

from prefect_snowflake.database import snowflake_query
from prefect import flow

from util.const import AWS_CREDENTIALS, SNOWFLAKE_CREDENTIALS, S3_RAW_BUCKET


@flow
def snowflake_pipeline():
    result = snowflake_query(
        query="put file:///data.csv @my_csv_stage",
        snowflake_credentials=SNOWFLAKE_CREDENTIALS,
    )

    return None


state = snowflake_pipeline()

And replacing the query with a select or insert works fine. So I know the connection is fine
Literally just doing a spike to see if Prefect is going to meet our needs

Anna_Geller @Anna_Geller: A dumb question but is a put command considered a query by Snowflake API?
I think this might be true, based on the top answer here, you can't execute put command using a Python connector, you need to do it via SnowSQL

Snowflake Community

Todd_de_Quincey @Todd_de_Quincey: It’s listed under their list of SQL commands
https://docs.snowflake.com/en/sql-reference/sql/put.html

Anna_Geller @Anna_Geller: Quote from Snowflake: "You can't execute a PUT with the python connector, you need to use SnowSQL. If you want to execute PUT through python, you'd have to create script file and then execute SnowSQL through an OS command function in python. It's not the cleanest way to go. If you use your own S3 buckets, then you have a lot more options on how to get your data from the local machine to S3 through python, which might be cleaner."

Todd_de_Quincey @Todd_de_Quincey: That’ll be it then. Thought I’d check since this connector is so new. Not an issue for prod, as we would be copying from an external S3 stage
Apologies for wasting your time!

Anna_Geller @Anna_Geller: do you want to open an issue with feature request in the repo?
I'm sure @Andrew_Huang could investigate whether we may add a task for this, this seems to be like a common pattern and quite useful (put is faster than inserts)
never wasting time discussing Prefect Collections! ask us anytime
and btw this comment is from 2019, this might have changed since then

I can open an issue for you here https://github.com/PrefectHQ/prefect-snowflake/issues/new

GitHub

`SnowflakeConnector.load()` snippet references the wrong import path

Given the prompt after creating an instance of the SnowflakeConnector block:

image

A user would assume this import path is correct, yet since this block is implemented in prefect_snowflake.database, the import fails with:

❯ python orchestrator.py
Traceback (most recent call last):
  File "/Users/nate/.../project/orchestrator.py", line 6, in <module>
    from prefect_snowflake import SnowflakeConnector
ImportError: cannot import name 'SnowflakeConnector' from 'prefect_snowflake' (/Users/nate/opt/miniconda3/envs/project/lib/python3.10/site-packages/prefect_snowflake/__init__.py)

Multi-query sync

Not a necessary addition, but could be a helpful feature. Essentially, if there are multiple snowflake queries that depend on each other and it would be beneficial to run them from the same session- for example - queries to build a temporary table, to stage a file, load into the temp table, perform some sort of transformation on the temp table, then load that table into the main table. snowflake_query, snowflake_query_sync, and snowflake_multiquery won't be able to do this as the first two run into the issue of closing the connection and dropping the temp table, and the multiquery will attempt to run those queries async.

Rename SnowflakeConnector 'schema' to 'schema_name'

Description

Might be nice to rename the schema arg in the SnowflakeConnector block.

 snowflake_connector = SnowflakeConnector(
        database="db_name",
        warehouse="warehouse_name",
        schema="schema_name",
        credentials=snowflake_credentials
    )

Current behavior

Accessing snowflake_connector.database works great; snowflake_connector.schema accesses the block class method called schema(). I need to use snowflake_connector.schema_ to access the desired attribute.

Desired behavior

Calling snowflake_connector.snowflake_schema or snowflake_connector.schema_name would be preferred.

Add collection sync workflow using cruft

Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.

Typo in code sample in README.md causes failure

There is a small typo in this example. Proposing the change for future readers. In the section "Access underlying Snowflake connection" the word “connector” needed to be "connection”. It's a little hard to see when you glance at it, but it cuases a failure in code execution.

Expectation / Proposal

The second to last line is incorrect connector needs to be connection

    with connector.get_connection() as connection:
        table_name = "TABLE_NAME"
        ddl = "NAME STRING, NUMBER INT"
        statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
        with connector.cursor() as cursor:
            cursor.execute(statement)

and should be

    with connector.get_connection() as connection:
        table_name = "TABLE_NAME"
        ddl = "NAME STRING, NUMBER INT"
        statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
        with connection.cursor() as cursor:
            cursor.execute(statement)

Traceback / Example

Error "Do not provide both private_key and private_key_path; select one."

When I try to establish a Snowflake connection like described in the docs, I see an error:

"Do not provide both private_key and private_key_path; select one."

My flow setup looks roughly like this:

@flow
def snowflake_query_flow():
snowflake_credentials = SnowflakeCredentials(
account="account",
user="user",
password="password",
role="MY_ROLE"
)
snowflake_connector = SnowflakeConnector(
database="database",
warehouse="warehouse",
schema="schema",
credentials=snowflake_credentials
)
result = snowflake_query(
"SELECT * FROM my_table;",
snowflake_connector
)
return result

snowflake_query_flow()

I'm using the latest version (0.25) which was released a couple of days ago. I saw that there were some changes made to this part.

Snowflake Credentials Endpoint Issue

I believe there is a bug located within the following script, but am unsure. The arguments and the get values seem to be out of sync.

https://github.com/PrefectHQ/prefect-snowflake/blob/main/prefect_snowflake/credentials.py

class SnowflakeCredentials(Block):
    """
    Block used to manage authentication with Snowflake.
    Args:
        account (str): The snowflake account name.
        user (str): The user name used to authenticate.
        password (SecretStr): The password used to authenticate.
        private_key (SecretStr): The PEM used to authenticate.
        authenticator (str): The type of authenticator to use for initializing
            connection (oauth, externalbrowser, etc); refer to
            [Snowflake documentation](https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect)
            for details, and note that `externalbrowser` will only
            work in an environment where a browser is available.
        token (SecretStr): The OAuth or JWT Token to provide when
            authenticator is set to OAuth.
        okta_endpoint (str): The Okta endpoint to use when authenticator is
            set to `okta_endpoint`, e.g. `https://<okta_account_name>.okta.com`.
        role (str): The name of the default role to use.
        autocommit (bool): Whether to automatically commit.
    Example:
        Load stored Snowflake credentials:
        ```python
        from prefect_snowflake import SnowflakeCredentials
        snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")
        ```

    authenticator: Literal[
        "snowflake",
        "externalbrowser",
        "okta_endpoint",
        "oauth",
        "username_password_mfa",
    ] = Field(  # noqa
        default="snowflake",
        description=("The type of authenticator to use for initializing connection"),
    ),
    )
    endpoint: Optional[str] = Field(
        default=None,
        description=(
            "The Okta endpoint to use when authenticator is set to `okta_endpoint`"
        ),
    )

Which then violates the following method below as the requested value doesn't exist.

    @root_validator(pre=True)
    def _validate_okta_kwargs(cls, values):
        """
        Ensure an authorization value has been provided by the user.
        """
        authenticator = values.get("authenticator")
        okta_endpoint = values.get("okta_endpoint")
        if authenticator == "okta_endpoint" and not okta_endpoint:
            raise ValueError(
                "If authenticator is set to `okta_endpoint`, "
                "`okta_endpoint` must be provided"
            )
        return values

My output trace points back to this, and within the Orion UI there is no Okta Endpoint field fwiw.

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 205, in coroutine_wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/usr/local/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 655, in load
    return cls._from_block_document(block_document)
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 554, in _from_block_document
    block = block_cls.parse_obj(block_document.data)
  File "pydantic/main.py", line 526, in pydantic.main.BaseModel.parse_obj
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 175, in __init__
    super().__init__(*args, **kwargs)
  File "pydantic/main.py", line 342, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for SnowflakeCredentials
__root__
  If authenticator is set to `okta_endpoint`, `okta_endpoint` must be provided (type=value_error)

Issue present here as well.
https://github.com/PrefectHQ/prefect-snowflake/blob/main/prefect_snowflake/database.py#L70

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.