vincentclaes / datajob Goto Github PK
View Code? Open in Web Editor NEWBuild and deploy a serverless data pipeline on AWS with no effort.
Home Page: https://pypi.org/project/datajob/
License: Apache License 2.0
Build and deploy a serverless data pipeline on AWS with no effort.
Home Page: https://pypi.org/project/datajob/
License: Apache License 2.0
>3.6,<4.0.0
Do you wish to deploy these changes (y/n)? y
data-pipeline-simple-dev: deploying...
โ data-pipeline-simple-dev failed: Error: This stack uses assets, so the toolkit stack must be deployed to the environment (Run "cdk bootstrap aws://077590795309/eu-west-1")
at Object.addMetadataAssetsToManifest (/usr/local/lib/node_modules/aws-cdk/lib/assets.ts:27:11)
at Object.deployStack (/usr/local/lib/node_modules/aws-cdk/lib/api/deploy-stack.ts:205:29)
at processTicksAndRejections (internal/process/task_queues.js:93:5)
at CdkToolkit.deploy (/usr/local/lib/node_modules/aws-cdk/lib/cdk-toolkit.ts:180:24)
at initCommandLine (/usr/local/lib/node_modules/aws-cdk/bin/cdk.ts:204:9)
This stack uses assets, so the toolkit stack must be deployed to the environment (Run "cdk bootstrap aws://077590795309/eu-west-1")
Traceback (most recent call last):
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/datajob-KxqvMF6C-py3.6/bin/datajob", line 5, in <module>
run()
File "/Users/vincent/Workspace/datajob/datajob/datajob.py", line 20, in run
app()
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/datajob-KxqvMF6C-py3.6/lib/python3.6/site-packages/typer/main.py", line 214, in __call__
return get_command(self)(*args, **kwargs)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/datajob-KxqvMF6C-py3.6/lib/python3.6/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/datajob-KxqvMF6C-py3.6/lib/python3.6/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/datajob-KxqvMF6C-py3.6/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/datajob-KxqvMF6C-py3.6/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/datajob-KxqvMF6C-py3.6/lib/python3.6/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/datajob-KxqvMF6C-py3.6/lib/python3.6/site-packages/typer/main.py", line 497, in wrapper
return callback(**use_params) # type: ignore
File "/Users/vincent/Workspace/datajob/datajob/datajob.py", line 51, in deploy
call_cdk(command="deploy", args=args, extra_args=extra_args)
File "/Users/vincent/Workspace/datajob/datajob/datajob.py", line 103, in call_cdk
subprocess.check_call(shlex.split(full_command))
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/subprocess.py", line 311, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['cdk', 'deploy', '--app', 'python /Users/vincent/Workspace/datajob/examples/data_pipeline_simple/datajob_stack.py', '-c', 'stage=dev']' returned non-zero exit status 1.
(datajob-KxqvMF6C-py3.6) Vincents-MacBook-Pro:data_pipeline_simple vincent$
python setup.py bdist_wheel requires to have wheel isntalled
Current requirements is ^1.1.2 while version 2.1 is available.
This would allow to be compatible with sagemaker version 2.
with StepfunctionsWorkflow(
datajob_stack=mailswitch_stack, name="workflow"
) as step_functions_workflow:
join_labels >> ...
it might also be easier to execute a workflow that has the same name as the stack
cdk run
this triggers the step functions workflow
implement sagemaker stepfunctions in datajob.
check linked PR's
check this in advance and raise an error from within datajob
Unable to resolve AWS account to use. It must be either configured when you define your CDK or through the environment
Traceback (most recent call last):
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/bin/datajob", line 8, in <module>
sys.exit(run())
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/datajob/datajob.py", line 17, in run
app()
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/typer/main.py", line 214, in __call__
return get_command(self)(*args, **kwargs)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/typer/main.py", line 497, in wrapper
return callback(**use_params) # type: ignore
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/datajob/datajob.py", line 37, in deploy
call_cdk(command="deploy", args=args, extra_args=extra_args)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/datajob/datajob.py", line 73, in call_cdk
subprocess.check_call(shlex.split(full_command))
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/subprocess.py", line 311, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['cdk', 'deploy', '--app', 'python /Users/vincent/Workspace/zippo-data-layer/deployment_zippo.py', '-c', 'stage=stg']' returned non-zero exit status 1.
The definition of our pipeline can be found in examples/data_pipeline_simple/datajob_stack.py, and here below:
we now support python 3.7.
remember that glue 1.0 is python 3.6 ....
datajob/datajob/datajob_stack.py
Line 62 in 151544d
creating the resources happens now in datajob stack
shouldn't we do it when creating the GlueJob() as final step of the init function?
because of how it's implemented now we assume all resources should implement "create" function
DataJobBase
and implement the requirements for an ecs fargate task/job?use these arguments to define how we should package the project
now we return the default value "dev" for each stage value we pass
step functions workflow should first inherit from the datajob stack before checking env vars
Traceback (most recent call last):
File "/Users/vincent/Workspace/zippo-data-layer/deployment_zippo.py", line 87, in <module>
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name=stackname) as sfn:
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/jsii/_runtime.py", line 83, in __call__
inst = super().__call__(*args, **kwargs)
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/datajob/stepfunctions/stepfunctions_workflow.py", line 53, in __init__
self.region = region if region else os.environ["AWS_DEFAULT_REGION"]
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/os.py", line 669, in __getitem__
raise KeyError(key) from None
KeyError: 'AWS_DEFAULT_REGION'
self.region = region if region else os.environ["AWS_DEFAULT_REGION"]
make this work:
[t1 >> t2, t3 >> t4]
https://stackoverflow.com/a/50661182/1771155
task >> ...
# We instantiate a step functions workflow and orchestrate the glue jobs.
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow", notification="some-email...") as sfn:
task1 >> task2
this is the broken link: https://github.com/vincentclaes/datajob/tree/add-simple-example/examples/data_pipeline_simple
I suppose this is the correct link: https://github.com/vincentclaes/datajob/tree/main/examples/data_pipeline_simple
but the subsequent links are also invalid.
The None
has a capital letter which is invalid.
I ran:
export AWS_DEFAULT_ACCOUNT=_____________29
export AWS_PROFILE=my-profile
export AWS_DEFAULT_REGION=your-region # e.g. eu-west-1
<..>/datajob/examples/data_pipeline_simple$ datajob deploy --config datajob_stack.py
cdk command: cdk deploy --app "python <..>/datajob/examples/data_pipeline_simple/datajob_stack.py" -c stage=None
jsii.errors.JavaScriptError:
Error: Invalid S3 bucket name (value: data-pipeline-simple-None-deployment-bucket)
Bucket name must only contain lowercase characters and the symbols, period (.) and dash (-) (offset: 21)
hi V, I noticed that the workflow fails if the last step is not an unique job.
Ex: task1 >> [task2, task3] fails
9:14
but [task1, task2] >> task3 works
9:15
(in my example each task are independent)
9:15
is this the expected behavior?
(node:17808) ExperimentalWarning: The fs.promises API is experimental
python: can't open file 'deployment_glue_datajob.py': [Errno 2] No such file or directory
Subprocess exited with error 2
DVCL643@10NB03610:~/workspace/python/aws_best_practices$ cd glue
DVCL643@10NB03610:~/workspace/python/aws_best_practices/glue$ cdk deploy --app "python deployment_glue_datajob.py"
(node:10368) ExperimentalWarning: The fs.promises API is experimental
Traceback (most recent call last):
File "deployment_glue_datajob.py", line 60, in <module>
python_job >> pyspark_job
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\datajob\stepfunctions\stepfunctions_workflow.py", line 115, in __exit__
self._build_workflow()
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\datajob\stepfunctions\stepfunctions_workflow.py", line 91, in _build_workflow
self.client = boto3.client("stepfunctions")
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\boto3\__init__.py", line 93, in client
return _get_default_session().client(*args, **kwargs)
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\boto3\session.py", line 263, in client
aws_session_token=aws_session_token, config=config)
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\botocore\session.py", line 826, in create_client
credentials = self.get_credentials()
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\botocore\session.py", line 431, in get_credentials
'credential_provider').load_credentials()
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\botocore\credentials.py", line 1975, in load_credentials
creds = provider.load()
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\botocore\credentials.py", line 1102, in load
credentials = fetcher(require_expiry=False)
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\botocore\credentials.py", line 1137, in fetch_credentials
provider=method, cred_var=mapping['secret_key'])
botocore.exceptions.PartialCredentialsError: Partial credentials found in env, missing: AWS_SECRET_ACCESS_KEY
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "deployment_glue_datajob.py", line 60, in <module>
python_job >> pyspark_job
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\datajob\datajob_stack.py", line 74, in __exit__
self.create_resources()
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\datajob\datajob_stack.py", line 93, in create_resources
[resource.create() for resource in self.resources]
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\datajob\datajob_stack.py", line 93, in <listcomp>
[resource.create() for resource in self.resources]
File "C:\Users\dvcl643\.virtualenvs\glue-E61qYPlY\lib\site-packages\datajob\stepfunctions\stepfunctions_workflow.py", line 104, in create
text_file.write(self.workflow.get_cloudformation_template())
AttributeError: 'NoneType' object has no attribute 'get_cloudformation_template'
Subprocess exited with error 1
let the user add **kwargs to;
what is this all about?
If an error occurs it looks like the "exit" function is called of the contextmanager and it might be the workflow is still None, which raises another exception when trying to create the resources.
KeyError: 'AWS_DEFAULT_REGION'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/vincent/Workspace/zippo-data-layer/deployment_zippo.py", line 91, in <module>
] >> crop_raster_per_country >> dump_data_layer_to_gbq >> dump_display_names_to_gbq
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/datajob/datajob_stack.py", line 72, in __exit__
self.create_resources()
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/datajob/datajob_stack.py", line 91, in create_resources
[resource.create() for resource in self.resources]
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/datajob/datajob_stack.py", line 91, in <listcomp>
[resource.create() for resource in self.resources]
File "/Users/vincent/Library/Caches/pypoetry/virtualenvs/zippo-data-layer-_EDEGVNn-py3.6/lib/python3.6/site-packages/datajob/stepfunctions/stepfunctions_workflow.py", line 102, in create
text_file.write(self.workflow.get_cloudformation_template())
AttributeError: 'NoneType' object has no attribute 'get_cloudformation_template'
now we need to provide stage.
you can make this optional
now we explicitly specify a region when defining a stepfunctions workflow.
we should handle it implicitly by boto3
also don't raise an error when no region is found
--from-last-error
add a function in the task decorator to look for the context "workflow" and add the task
look for this to prefect: https://github.com/PrefectHQ/prefect
aws-cdk-stepfunctions
if you have the same task name and stage over 2 different pipelines, you will have a conflict
e.g name "task1" and stage="stg" will result in task1-stg
we need to prefix this with our stack name, e.g.; my-stack-task1-stg
@stepfunctions_workflow.task
class SomeMockedClass(object):
def __init__(self, unique_name):
self.unique_name = unique_name
self.sfn_task = Task(state_id=unique_name)
better resemble reality
On a Linux box with conda, this could be an explanation on how to get pytest running.
/home/peter_v/anaconda3/bin/python -m pip install --upgrade pip # to avoid warnings about spyder 4.1.5 versions
make
sudo apt install nodejs # to avoid massive warnings about RuntimeError: generator didn't stop after throw()
$ poetry run pytest
========================================== test session starts ===========================================
platform linux -- Python 3.8.2, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /home/peter_v/data/github/vincentclaes/datajob
collected 16 items
datajob_tests/test_datajob_context.py . [ 6%]
datajob_tests/test_datajob_stack.py .... [ 31%]
datajob_tests/datajob_cli_tests/test_datajob_deploy.py ....... [ 75%]
datajob_tests/datajob_cli_tests/test_datajob_execute.py . [ 81%]
datajob_tests/glue/test_glue_job.py . [ 87%]
datajob_tests/stepfunctions/test_stepfunctions_workflow.py .. [100%]
=========================================== 16 passed in 5.62s ===========================================
do not use shell=True when running a command
more info : https://stackoverflow.com/a/64341833/1771155
e.g. to acces data bucket: datajob_stack.datajob_context.data_bucket_name
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.