rocs-org / data-pipelines Goto Github PK
View Code? Open in Web Editor NEWData engineering monorepo with Airflow, dbt and postgres
Home Page: https://ornt.biologie.hu-berlin.de/airflow
Data engineering monorepo with Airflow, dbt and postgres
Home Page: https://ornt.biologie.hu-berlin.de/airflow
To make implementing new pipelines easy for everyone, we want
Since the links to the cases data files behind the Arcgis dashboard break without warning from time to time, we are using this now as an opportunity to switch the source of the cases data to https://github.com/robert-koch-institut/SARS-CoV-2_Infektionen_in_Deutschland
There are a couple of DAGs this data gets used in, and the new source does not contain e.g. the Bundesland, Landkreis, Altersgruppe2 (irrelevant) columns, so this will take a bit of digging.
To make the infrastructure accessible for people who want to add their pipelines to it, it should feature some copy paste boilerplate examples and instructions on how to use them.
Currently, we rebuild our application image (containing airflow, poetry and all python deps) every time the dependencies change. This takes a lot of time and slows down the dev process.
Proposed solution: Upload the current image (reflecting the state of the application on the main branch) to a docker repository and pull the image for further development from there (thus only installing incremental changes in dependencies)
The pipeline producing the detections+json for https://corona-datenspende.de/science/monitor/ needs to be ported to an Airflow DAG. Along the way I also want to include a step to drop data from earlier than 2020-04-12, in case that is included in the ingest.
The docs say, that this tool was made to enable people to convert their data wrangling in jupyter notebooks into airflow pipelines and jobs. As this is something that most of our users might want to do, we will evaluate this option and if possible create a dummy workflow as a hands on example.
Thryve added a new column to the choice table that we don't expect and that beaks the dag.
The symptoms question in the weekly survey has a new ID since ~2022/01/18. This causes the feature extraction pipeline to not gather answers for this new question resulting in symptoms not being listed in extracted features.
New question ID is 137.
We have to extract symptom data from both, ID 86 (the old one) and ID 137 (the new one) while not breaking downward compatibility with only extracting answers from one question.
We currently do not receive/process the vital data in epoch granularity. This data holds tremendous value and I want to re-add it to our database.
Steps
Pipeline exits with connection reset by peer.
After restructuring the repo, the CD pipeline is broken. Fix it and make sure that next time it breaks, it also goes red.
It seems like the format of the files providing cases data has changed, resulting in failures in data collection.
For survey data from the ''Tests and Symptoms" survey, we need a post processed table that contains the following information:
To resolve the dag interdependency between the survey and the vital data dags in the data donation project, we have to change their schedules. However, the schedules in the production deployment don't update even though the dag parameters are already changed.
https://github.com/rocs-org/rocs-scripts
under fixme there are
both are broken atm
due to how psycopg2 handles insert queries (batch) the table is truncated again at every batch that is executed, see below
https://www.psycopg.org/docs/extras.html#psycopg2.extras.execute_values
Just spin up the docker stack and install dependencies. No tests, no linting etc. Those come later.
To have consistent database schemas in all environments, we want to have a single source of truth for them. This can be done via migrations that are tracked in git together with a migration manager that applies them to the databases that we use.
A good tooling option would be yoyo-migrations as it allows for migrations to be written in plain SQL as well as python functions and works well with psycopg2.
Docker builds are failing due to a missing signature of the repo of one of its dependencies.
we usually need case numbers in (i) raw, (ii) 7d-averaged/summed, or (iii) 7d-averaged/summed per 100k. I would like to have this updated as an extra table called incidences
in our db.
I've written the following the following script that can be run whenever the case numbers are downloaded and processed. It uses polars v 0.8.9
and computes incidences for every location on every level of abstraction that we have (4 levels):
these are the columns with types (for creating the table in the db):
[
('location_id',po.UInt16),
('location_level',po.UInt8),
('date_of_report',Date),
('new_cases',po.Int32),
('new_cases_last_7d',po.Int32),
('incidence_7d_per_100k',po.Float32),
('new_deaths',po.Int32),
('nuts3',str),
('population',po.UInt32),
('state',po.UInt8),
]
For the datenspende project, we need to collect user account data from thryve as a prerequisite to deal with vital and survey data.
We want to deploy to production each time we tag a release.
The data export from thrive contains a List of all users that are currently using the app. We should delete all users on our side that are not part of that list.
Implement detections classifier implementing the interface of sklearn classifierts in a separate library package.
This is a workaround (#98) to resolve the issue of DAGs accessing the same table. In the not too distant feature we probably want to restructure the datenspende DAGs
To avoid unnecessary CI failures, do code formatting automatically. Also, run linting and reject commit if linting does not pass.
For our production deployment, we need to use the actual production database of the group and probably set some other environment variables that differ from our CI and local testing environments.
In order to quickly pull additional information, the table datenspende_derivates.homogenized_features
should contain a column with the questionnaire_session
of the corresponding data in datenspende.answers
To run end to end tests for our pipelines, we need to mock our production database. To do this, we have to run all the migrations to set up the DB schemas in CI every time, so that tests find an environment that mirrors the one that we have in production.
Import predictions model and load baseline and user features data to make infection predictions. Save in predictions table.
This should implement a simplified version of the baseline calculation in here without all the edge cases handling missing or scarce data.
Setup the minimum required stack to run airflow with a target db for testing.
Calculation of incidences is currently failing due to an integer overflow. (see logs here)
This can probably be fixed by either changing the incidence calculation script or changing the db shema to BIGINT
for the respective values. Probably @benmaier can advice
To validate the user input in the datenspende project, we need a List of valid german ZIP codes and their corresponding NUTS3 regions. Those are available at https://gisco-services.ec.europa.eu/tercet/flat-files
Add the responses from question 91
to datenspende_derivates.homogenized_features
. They contain information about the type (PCR, Antigen, Antibody) of the test taken.
I'm starting to get a hold of how things work. Generally I prefer templates/examples to include inline comments whereever possible, so that every step is explained. Is that sth that could be added?
From what I understand now, the following files are relevant for building my own pipeline:
1. dags/database/migrations/migration_files/20211014_99_move_test_table_to_test_shema.sql
(for defining the structure of the table)
2. dags/csv_download_to_postgres/csv_download_to_postgres.py (for doing the actual work)
3. dags/csv_download_to_postgres/test_csv_download_to_postgres.py
(for testing parts of the pipeline)
4. dags/csv_download_to_postgres/test_integration_csv_download_to_postgres.py
(for testing the entire pipeline)
Which others are necessary to start building pipeline?
Originally posted by @marcwie in #65 (comment)
To incorporate test results, symptoms and information about bodies of users (height, weight, sex etc) we should add a table in datenspende_derivatives
that contains that data.
I intend to add two separate tables for
Both should contain the following columns:
Possibly also vaccination status, but I haven't figured out yet where to get it. So this will probably move to a follow up.
We want to prohibit pushing to master and introduce new code via feature branches. We require passing CI (and potentially preview CD) as well as one approving review for merging. We want to have a bot that does the merging and cleanup if possible.
Currently the management system on Ava has a table that contains config files - these are strings in json format with a name like "SuchGreenAnt", which is the current config for the datenspende detections master.
I would like to add this functionality under the new framework, bc. storing and retrieving configs in a central place provides great value imho.
I propose the following
So we have access to our file sharing and can mount it into the pipepline management
We need a pipeline to collect the survey data for the Datenspende project from Thryve.
As a first iteration, we just collect and store the survey data and figure out useful post processing steps later.
Currently, during docker builds, poetry installs/updates all dependencies, even though they are already present and versions don't change. This makes changing dependencies and rebuilding the docker stack slow and should be fixed.
To enable debugging of data, we need to add the questionnaire session to the homogenized features table to be able to track back the origin of feature values to the raw questionnaire data.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.