Code Monkey home page Code Monkey logo

pydwt's Introduction

pydwt

The pydwt library provides a set of tools for orchestrating tasks of data processing in a directed acyclic graph (DAG). This DAG is composed of tasks that have dependencies between them and can be executed in parallel or sequentially, depending on their dependencies.

Installation

pip install pydwt

Guide

In this document, we will provide a brief explanation of the main modules of the pydwt library, which are:

  • session.py: module for interacting with a database and creating DataFrame objects to manipulate data.
  • dataframe.py: module for defining a DataFrame class for working with data.
  • task.py: module for defining tasks in the DAG.
  • dag.py: module for creating and traversing the DAG.
  • workflow.py: module for running the DAG.

Session

The session.py module is responsible for interacting with a database and creating DataFrame objects to manipulate data. To use this module, you need to create an instance of the Session class, passing an SQLAlchemy engine object and the schema of the database (if it has one). Then, you can use the table method to create a DataFrame object from a table in the database.

Here is an example of how to create a Session object and use the table method to create a DataFrame object:

from sqlalchemy import create_engine
from pydwt.sql.session import Session

engine = create_engine("postgresql://user:password@localhost/dbname")
session = Session(engine, schema="my_schema")

df = session.table("my_table")

DataFrame

The dataframe.py module defines a DataFrame class for working with data. A DataFrame object is essentially a table with labeled columns and rows. You can use it to perform operations such as selecting, filtering, grouping, and aggregating data.

You can also materialize a DataFrame as a table or view in the database by calling the materialize method.

Here is an example of how to create a DataFrame object and perform some operations on it:

from pydwt.sql.session import Session
from pydwt.sql.dataframe import DataFrame

session = Session(engine, schema="my_schema")

df = session.table("my_table")

# select some columns
df = df.select(df.col1, df.col2)

# filter rows based on a condition
df = df.where(df.col1 > 10)

# group by a column and aggregate another column
df = df.group_by(df.col2, agg={df.col1: (func.sum, "sum_col1")})

# show the resulting DataFrame
df.materialize("new_table", as_="table")

Task

The task.py module defines a Task class for representing a task in the DAG. A Task object has a run method that is responsible for executing the task. You can also define the task's dependencies, schedule, and other parameters when creating the object.

To create a Task object, you can use the @Task decorator and define the run method. Here is an example of how to create a Task object:

from pydwt.core.task import Task

@Task()
def task_one():
    df = session.table("features")
    df = df.with_column("new_column", case((df.preds == "hw", "W")))
    df.materialize("new_table", as_="table")


@Task(depends_on=[task_one])
def task_two():
    df = session.table("new_table")
    df = df.where((df.new_column == "W"))
    df = df.with_column("new_column", case((df.preds == "hw", "W")))
    df.show()

Create a new pydwt project:

pydwt new <my_project>

This command will create a new project with the name "my_project" and the required file structure.

my_project/
    models/
        example.py
    dags/
settings.yml
  • project_name/models: where you will put your tasks
  • project_name/dags/: where the corresponding dag PNG file will be
  • settings.yml: a configuration file for your project. This file includes the configuration options for your project, such as the path to your data directory.

Export the DAG

pydwt export-dag

will export the current state of your dag in the project_name/dags/ as PNG file with timestamp.

Run your project

pydwt run <module.function_name>

If no argument provided will run the current state of your DAG. It will process the tasks in the DAG by level and parallelize it with the ThreadExecutor. It a task failed then its child tasks will not be run.

If argument provided in the form of module.function_name for instance example.task_one then will run all tasks in the dag leading to this task.
If parent tasks succeeded then run the task.

Test your connection setup

pydwt test-connection

will test the current setup of your DB connectiona according to your settings.yml file.

Configuration of your pydwt project

The settings.yml file is a configuration file for your pydwt project. It stores various settings such as the project name, database connection details, and DAG tasks.

connection

The connection section contains the configuration details for connecting to the database. The available options are:

  • url: the connection string to your db

You can add others keys that will be forwarded to the underlying create_engine function for instance you can add a echo : true and it will call create_engine(url=url, echo=echo) see here supported args.

project

The project section contains the project-related settings. The available options are:

name: the name of the project

tasks

This section contains the configuration for each task defined in the pydwt project.

Each task is identified by its name, and the configuration is stored as a dictionary.

The dictionary can contain any key-value pairs that the task implementation may need to use, but it must have a key named materialize.

  • The materialize key specifies how the task output should be stored. The value can be either view or table. The value of the materialize key determines whether the task output should be stored as a SQL view or a SQL table. If the value is view, the output is stored as a SQL view. If the value is table, the output is stored as a SQL table.

Each task implementation can access its configuration by injecting the config argument and specifying Provide[Container.config.tasks.<task_name>]. The injected config argument is a dictionary containing the configuration for the specified task.

example :

from pydwt.core.task import Task
from dependency_injector.wiring import inject, Provide
from pydwt.core.containers import Container

@Task()
@inject
def task_one(config:dict = Provide[Container.config.tasks.task_one]):
    print(config)

@Task(depends_on=[task_one])
def task_two():
    print("somme processing")

sources

The sources section contains the database sources that can be used in the project. Each source must have a unique name and specify the schema and table to use for the source.

sources:
  name_alias:
    table: xxx
    schema: yyy

You can then required this datasource in your tasks

from pydwt.core.task import Task
from dependency_injector.wiring import inject, Provide
from pydwt.core.containers import Container

@Task()
@inject
def task_one(
    config:dict = Provide[Container.config.tasks.task_one],
    repo= Provide[Container.datasources],
    ):
    df = repo.get_sources("name_alias")

License

This project is licensed under GPL.

pydwt's People

Contributors

mg30 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

pydwt's Issues

Run only one task

Given a task name it should be possible to run only this task or the task and its dependencies

Improve Parallelism

Actually the Parallelism is based on the level of the dag. However if one task of the level is taking a long time then it blocks at this level preventing other path in the DAG to complete.

GIven a DAG, I want to start each task that has it predecessors not in error.

Improve DataFrame API

Improve the dataframe API:

  • can accpet list a string in the select method
  • add a filter method to alias where
  • filter shoud accept a string expr
  • add union implementation

Connection string

Allow a more flexible way to provide connection string to expand supported DB thanks to SQL alchemy dialects

Filter the dag when passing a task name to the run method

The goal is to provide a way to execute a task with or without it dependencies.

If --with-depth is passed to the run method with the task name then
Given a task name
I want to have the dag filtered so it includes all the path from source to node with name == task name.

else
run the task with no depth

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.