hikaya-io / connectors Goto Github PK
View Code? Open in Web Editor NEWA flexible data integration tool to help nonprofits connect to their data collection tools and ERP systems
A flexible data integration tool to help nonprofits connect to their data collection tools and ERP systems
Current behavior
the SurveyCTO Dag fails with the below error:
*** Log file does not exist: /opt/airflow/logs/dots_survey_cto_data_pipeline/Save_data_to_DB/2022-01-23T00:00:00+00:00/2.log
*** Fetching from: http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dots_survey_cto_data_pipeline/Save_data_to_DB/2022-01-23T00:00:00+00:00/2.log
[2022-01-24 00:05:06,084] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: dots_survey_cto_data_pipeline.Save_data_to_DB 2022-01-23T00:00:00+00:00 [queued]>
[2022-01-24 00:05:06,107] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: dots_survey_cto_data_pipeline.Save_data_to_DB 2022-01-23T00:00:00+00:00 [queued]>
[2022-01-24 00:05:06,108] {taskinstance.py:1087} INFO -
--------------------------------------------------------------------------------
[2022-01-24 00:05:06,108] {taskinstance.py:1088} INFO - Starting attempt 2 of 3
[2022-01-24 00:05:06,108] {taskinstance.py:1089} INFO -
--------------------------------------------------------------------------------
[2022-01-24 00:05:06,158] {taskinstance.py:1107} INFO - Executing <Task(PythonOperator): Save_data_to_DB> on 2022-01-23T00:00:00+00:00
[2022-01-24 00:05:06,166] {standard_task_runner.py:52} INFO - Started process 1230 to run task
[2022-01-24 00:05:06,172] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'dots_survey_cto_data_pipeline', 'Save_data_to_DB', '2022-01-23T00:00:00+00:00', '--job-id', '160', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/pull_survey_cto_data.py', '--cfg-path', '/tmp/tmpbl7aom4g', '--error-file', '/tmp/tmpb0vzypdc']
[2022-01-24 00:05:06,174] {standard_task_runner.py:77} INFO - Job 160: Subtask Save_data_to_DB
[2022-01-24 00:05:06,468] {logging_mixin.py:104} INFO - Running <TaskInstance: dots_survey_cto_data_pipeline.Save_data_to_DB 2022-01-23T00:00:00+00:00 [running]> on host airflow-worker-0.airflow-worker.default.svc.cluster.local
[2022-01-24 00:05:06,593] {taskinstance.py:1300} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=Hikaya-Dots
AIRFLOW_CTX_DAG_ID=dots_survey_cto_data_pipeline
AIRFLOW_CTX_TASK_ID=Save_data_to_DB
AIRFLOW_CTX_EXECUTION_DATE=2022-01-23T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-01-23T00:00:00+00:00
[2022-01-24 00:05:06,594] {pull_survey_cto_data.py:44} INFO - Loading data from SurveyCTO server lutheranworld...
[2022-01-24 00:05:06,847] {surveycto.py:69} ERROR - Error getting list of SurveyCTO forms
[2022-01-24 00:05:06,848] {surveycto.py:70} ERROR - 401 Client Error: 401 for url: https://lutheranworld.surveycto.com/console/forms-groups-datasets/get
[2022-01-24 00:05:06,848] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/repo/connectors/DAGs/pull_survey_cto_data.py", line 54, in import_forms_and_submissions
forms = scto_client.get_all_forms()
File "/opt/airflow/dags/repo/connectors/DAGs/helpers/surveycto.py", line 71, in get_all_forms
raise e
File "/opt/airflow/dags/repo/connectors/DAGs/helpers/surveycto.py", line 61, in get_all_forms
forms_request = self.session.get(
File "/home/airflow/.local/lib/python3.8/site-packages/requests/sessions.py", line 555, in get
return self.request('GET', url, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/requests/sessions.py", line 662, in send
r = dispatch_hook('response', hooks, r, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/requests/hooks.py", line 31, in dispatch_hook
_hook_data = hook(hook_data, **kwargs)
File "/opt/airflow/dags/repo/connectors/DAGs/helpers/requests.py", line 58, in <lambda>
assert_status_hook = lambda response, *args, **kwargs: response.raise_for_status()
File "/home/airflow/.local/lib/python3.8/site-packages/requests/models.py", line 943, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 401 Client Error: 401 for url: https://lutheranworld.surveycto.com/console/forms-groups-datasets/get
[2022-01-24 00:05:06,852] {taskinstance.py:1544} INFO - Marking task as UP_FOR_RETRY. dag_id=dots_survey_cto_data_pipeline, task_id=Save_data_to_DB, execution_date=20220123T000000, start_date=20220124T000506, end_date=20220124T000506
[2022-01-24 00:05:06,907] {local_task_job.py:149} INFO - Task exited with return code 1
The data imported from SurveyCTO into PostgreSQL is being saved into the default database. This can become unmanageable and may cause conflicts, when 2 servers have 2 forms of the same name for example
For the sake of isolation, create a dedicated PostgreSQL database for each SurveyCTO integration.
Also, deleting a client's data will be cleaner and less risky in the future.
This can maybe help in managing access to the loaded data in the future.
Acceptance Criteria
When a SurveyCTO server is hooked, create a dedicated PostgreSQL database.
Forms loaded as tables will be inside said database.
We would like to automate the process of making SurveyCTO data available to the DB that connects to Superset.
As a user creates a SurveyCTO form and publishes it, submission data will be incoming.
At this point, we would like to watch incoming submissions and trigger the automatic creation of a new table in the DB so that this data is made available to Superset and the user to create new charts/graphs on.
Currently, certain methods in the DAGs do not have proper error handling.
For example, this pull_survey_cto_data.get_form_data
method
connectors/DAGs/pull_survey_cto_data.py
Line 61 in 9e16ecf
Acceptance Criteria
Identify parts of the DAG where sensitive requests are being made (remote APIs, DB operations...) and use Python exception handling as you see fit:
try/except
blockslogger.exception(e)
Current behavior
After this refactor #16 task notification on tasks seems to fails
Screenshots
[2021-03-25 10:04:09,652] {{taskinstance.py:968}} ERROR - Failed when executing success callback [2021-03-25 10:04:09,652] {{taskinstance.py:969}} ERROR - name 'SLACK_CONN_ID' is not defined Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task task.on_success_callback(context) File "/usr/local/airflow/dags/helpers/task_utils.py", line 18, in callback slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password NameError: name 'SLACK_CONN_ID' is not defined
Is your feature request related to a problem? Please describe.
Support debugging/logging options depending on an Airflow variable.
Acceptance Criteria
Give the ability to the Airflow user to enable/disable debugging/logging by editing an Airflow variable.
Debugging details must be enabled on the DB engine and the Requests client to inspect interactions with the DB and external APIs.
Additional context
The Requests session used support a debug
parameter: https://github.com/hikaya-io/connectors/blob/master/DAGs/helpers/requests.py#L30
The SQL Alchemy create_engine
method used support an echo
parameter
We would like to review and refactor the code for the following dag: https://github.com/hikaya-io/connectors/blob/master/DAGs/export_newdea_db.py
We would like to clean up the variables on Airflow so that only valid variables exist
We would like to integrate MS Team for notification of DAGS
We would like to review and refactor any helpers used in Airflow DAGs: https://github.com/hikaya-io/connectors/tree/master/DAGs/helpers
Current behavior
For SurveyCTO DAG using the CSV import (https://github.com/hikaya-io/connectors/blob/master/DAGs/pull_survey_cto_data_csv.py), the import of some form and repeat groups fails because the IDs of the form and field names being retrieved are wrong and included some weird additional characters:
This can be either because:
Expected behavior
For the form ID and fields names to be fetched cleanly without those suffixes and prefixes
Current behavior
When the SurveyCTO DAG using the CSV import (https://github.com/hikaya-io/connectors/blob/master/DAGs/pull_survey_cto_data_csv.py) finishes executing, the following error occurs when it is supposed to send a Slack notification:
....
[2021-04-01 20:00:54,058] {{python_operator.py:114}} INFO - Done. Returned value was: None
[2021-04-01 20:00:54,190] {{logging_mixin.py:95}} INFO - [�[34m2021-04-01 20:00:54,190�[0m] {{�[34mbase_hook.py:�[0m84}} INFO�[0m - Using connection to: �[1mid: slack. Host: https://hooks.slack.com/services/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, Port: None, Schema: , Login: , Password: XXXXXXXX, extra: {}�[0m�[0m
[2021-04-01 20:00:54,198] {{logging_mixin.py:95}} INFO - [�[34m2021-04-01 20:00:54,197�[0m] {{�[34mbase_hook.py:�[0m84}} INFO�[0m - Using connection to: �[1mid: slack. Host: https://hooks.slack.com/services/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, Port: None, Schema: , Login: , Password: XXXXXXXX, extra: {}�[0m�[0m
[2021-04-01 20:00:54,198] {{logging_mixin.py:95}} INFO - [�[34m2021-04-01 20:00:54,198�[0m] {{�[34mhttp_hook.py:�[0m131}} INFO�[0m - Sending '�[1mPOST�[0m' to url: �[1mhttps://hooks.slack.com/services/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�[0m�[0m
[2021-04-01 20:00:54,227] {{logging_mixin.py:95}} WARNING - /usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py:1004: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
InsecureRequestWarning,
[2021-04-01 20:00:58,543] {{logging_mixin.py:95}} INFO - [�[34m2021-04-01 20:00:58,542�[0m] {{�[34mlocal_task_job.py:�[0m105}} INFO�[0m - Task exited with return code 0�[0m
Since the DAGs are expected to make many requests to third-party APIs, more robust and resilient Python Requests usage would be appreciated and can be achieved based on this article Advanced usage of Python requests - timeouts, retries, hooks
Acceptance criteria
Define and use a utility function that returns a Python Requests object, or a Session, that implements:
As discussed and suggested by @michaelbukachi in #25 (comment), refactor the utility PostgreSQL code to use SQLAlchemy Core.
This will avoid us security risks, make the code more readable and maintainable, and open all the features of SQLAlchemy
Please note that the PostgreSQL code is used by all of the DAGs and is critical to them, so make sure that the refactoring is compatible with the DAGs
Current behavior
When saving forms into the PostgreSQL database, a hard limit of 63 characters long table names is being hit.
Currently, we use the form's ID as a unique table name, but some of them have very long IDs/Titles.
Even more so, and since we store repeat group separately, I name those table by concatenating the form's ID and the repeat group's name, which can lead to very long table names.
Suggested fix
We would like to review and refactor the code for the following dag: https://github.com/hikaya-io/connectors/blob/master/DAGs/pull_survey_cto_data.py
After observing the Newdea data export for a few weeks we can confirm that the integration is working well. After one more final review of the integration, we would like to increase the frequency to nightly so that the data in Superset is more up to date.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.