Comments (8)
Thanks @soumilshah1995 for filing. Yea, I noticed in #23 that something changed with Poetry 1.6.x and it no longer installs on Python 3.7, even though 3.7 support wasn't (officially) deprecated until 1.6.1 I think.
That said, 1.5.x still works but I should fix this in the Dockerfile template.
from amazon-emr-cli.
we need to do this in docker file
# FIX
RUN curl -sSL https://install.python-poetry.org | python3 - --version 1.1.11
# REMOVE THIS
# RUN curl -sSL https://install.python-poetry.org | python3 -
from amazon-emr-cli.
There is some more issue
I have been working on project where I am trying to use boto3 and creating SQS client
looks like due to python 3.7 which was dedicated when using EMR cli with SQS client it throws error and shows warning on logs. suggestion would be to upgrade the python version in template as well.
/home/hadoop/environment/lib64/python3.7/site-packages/boto3/compat.py:82: PythonDeprecationWarning: Boto3 will no longer support Python 3.7 starting December 13, 2023. To continue receiving service updates, bug fixes, and security updates please upgrade to Python 3.8 or later. More information can be found here: https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/
warnings.warn(warning, PythonDeprecationWarning)
from amazon-emr-cli.
Yea, the CLI maintains compatibility with 3.7 today because EMR still uses 3.7. Once that changes, I'll deprecate 3.7 support.
from amazon-emr-cli.
Any suggestion on how to fix the issue as I wanted to use Boto3 SQS client looks like due to older python version it does not work as intended any suggestion would be great
from amazon-emr-cli.
That's just a warning, right? The error message indicates boto3 updates will no longer be available after December, but for now you should be fine.
Does that warning message cause any errors?
from amazon-emr-cli.
Yes I am not able to POLL my SQS queue for messages
I was using S3 Events + SQS Consuming Messages in EMR
I tried executing code in EMR Serverless using EMR CLI and I had really tough time to get it to work due to boto3 SQS client not working correctly I was using EMR 6.9
Code
import sys, json
from pyspark.sql import SparkSession
import boto3
spark = SparkSession \
.builder \
.appName("hudi_cow") \
.getOrCreate()
spark = (
SparkSession.builder.appName("SparkSQL")
.enableHiveSupport()
.getOrCreate()
)
class Poller:
def __init__(self, queue_url):
self.queue_url = queue_url
print("IN CLASS")
self.sqs_client = boto3.client('sqs'
)
print("Creating client object BOTO 3")
self.batch_size = 10
self.messages_to_delete = []
def get_messages(self, batch_size):
response = self.sqs_client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=batch_size,
WaitTimeSeconds=20
)
if 'Messages' in response:
messages = response['Messages']
for message in messages:
print("Message", message)
self.messages_to_delete.append({
'ReceiptHandle': message['ReceiptHandle'],
'Body': message['Body']
})
return messages
else:
return []
def commit(self):
for message in self.messages_to_delete:
self.sqs_client.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
self.messages_to_delete = []
def read_data_s3(path, format):
if format == "parquet" or format == "json":
spark_df = spark.read.json(path)
print(spark_df.show())
return spark_df
def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
enable_meta_data_indexing,
use_sql_transformer, sql_transformer_query,
target_path, index_type, method='upsert', clustering_column='default'):
"""
Upserts a dataframe into a Hudi table.
Args:
glue_database (str): The name of the glue database.
table_name (str): The name of the Hudi table.
record_id (str): The name of the field in the dataframe that will be used as the record key.
precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
partition_fields this is used to parrtition data
enable_partition (bool): Whether or not to enable partitioning.
enable_cleaner (bool): Whether or not to enable data cleaning.
enable_hive_sync (bool): Whether or not to enable syncing with Hive.
use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
sql_transformer_query (str): The SQL query to use for data transformation.
target_path (str): The path to the target Hudi table.
method (str): The Hudi write method to use (default is 'upsert').
index_type : BLOOM or GLOBAL_BLOOM
Returns:
None
"""
# These are the basic settings for the Hoodie table
hudi_final_settings = {
"hoodie.table.name": table_name,
"hoodie.datasource.write.table.type": table_type,
"hoodie.datasource.write.operation": method,
"hoodie.datasource.write.recordkey.field": record_id,
"hoodie.datasource.write.precombine.field": precomb_key,
}
# These settings enable syncing with Hive
hudi_hive_sync_settings = {
"hoodie.parquet.compression.codec": "gzip",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": glue_database,
"hoodie.datasource.hive_sync.table": table_name,
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
}
# These settings enable automatic cleaning of old data
hudi_cleaner_options = {
"hoodie.clean.automatic": "true",
"hoodie.clean.async": "true",
"hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
"hoodie.cleaner.fileversions.retained": "3",
"hoodie-conf hoodie.cleaner.parallelism": '200',
'hoodie.cleaner.commits.retained': 5
}
# These settings enable partitioning of the data
partition_settings = {
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.datasource.hive_sync.partition_fields": partition_fields,
"hoodie.datasource.write.hive_style_partitioning": "true",
}
hudi_clustering = {
"hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
"hoodie.clustering.inline": "true",
"hoodie.clustering.plan.strategy.sort.columns": clustering_column,
"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
"hoodie.clustering.plan.strategy.small.file.limit": "629145600"
}
# Define a dictionary with the index settings for Hudi
hudi_index_settings = {
"hoodie.index.type": index_type, # Specify the index type for Hudi
}
# Define a dictionary with the Fiel Size
hudi_file_size = {
"hoodie.parquet.max.file.size": 512 * 1024 * 1024, # 512MB
"hoodie.parquet.small.file.limit": 104857600, # 100MB
}
hudi_meta_data_indexing = {
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.async": "false",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.check.timeout.seconds": "60",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
}
if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
for key, value in hudi_meta_data_indexing.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
for key, value in hudi_clustering.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
# Add the Hudi index settings to the final settings dictionary
for key, value in hudi_index_settings.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
for key, value in hudi_file_size.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
# If partitioning is enabled, add the partition settings to the final settings
if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
for key, value in partition_settings.items(): hudi_final_settings[key] = value
# If data cleaning is enabled, add the cleaner options to the final settings
if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value
# If Hive syncing is enabled, add the Hive sync settings to the final settings
if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value
# If there is data to write, apply any SQL transformations and write to the target path
if spark_df.count() > 0:
if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
spark_df.createOrReplaceTempView("temp")
spark_df = spark.sql(sql_transformer_query)
spark_df.write.format("hudi"). \
options(**hudi_final_settings). \
mode("append"). \
save(target_path)
def process_message(messages, file_format='json'):
try:
batch_files = []
for message in messages:
payload = json.loads(message['Body'])
records = payload['Records']
s3_files = [f"s3://{record['s3']['bucket']['name']}/{record['s3']['object']['key']}" for record in records]
print('s3_files', s3_files)
for item in s3_files: batch_files.append(item)
if batch_files != []:
spark_df = read_data_s3(
path=batch_files,
format=file_format
)
print("**************")
spark_df.show()
print("**************")
upsert_hudi_table(
glue_database="hudidb",
table_name="customers",
record_id="customer_id",
precomb_key="ts",
table_type='COPY_ON_WRITE',
partition_fields="state",
method='upsert',
index_type='BLOOM',
enable_partition=True,
enable_cleaner=True,
enable_hive_sync=False,
enable_clustering=False,
clustering_column='default',
enable_meta_data_indexing='true',
use_sql_transformer=False,
sql_transformer_query='default',
target_path="s3://datateam-sandbox-qa-demo/silver/table_name=customers/",
spark_df=spark_df,
)
except Exception as e:
print("Error processing message:", e)
raise Exception("Error processing message:", e)
def run_job():
print("IN")
queue_url = '<URL>'
print(queue_url)
print("*")
poller = Poller(queue_url)
print("Eneteing while loop")
while True:
messages = poller.get_messages(poller.batch_size)
if not messages:
print("No messages to process. Exiting.")
break
else:
process_message(messages)
poller.commit()
run_job()
I could not get SQS Client for BOTO3 to work for some reason
from amazon-emr-cli.
Ah got it. Does your EMR Serverless job error out at all? I haven't made any SQS samples, but might be able to check tomorrow.
from amazon-emr-cli.
Related Issues (19)
- Add support for runtime roles for EMR steps HOT 3
- Add support for `--show-logs` on EMR Serverless
- Add support for `--show-logs` in cluster mode on EMR on EC2
- Add the ability to start a local EMR dev environment
- How to Enable Glue Hive MetaStore with EMR CLI HOT 3
- Add support for `--profile` flag for choosing AWS profile HOT 3
- Add documentation for emr-cli config file
- Finalize CLI API HOT 1
- Implement PyInstaller
- Add --job-args and --spark-submit-opts for EMR on EC2 HOT 1
- Sample job got failed : Python no such file or directory HOT 2
- Don't find option to configure external jars HOT 1
- Support of adding only extra spark properties for JobRun
- Add timestamps to [emr-cli] logs
- Add timeout option to the emr-serverless job run HOT 1
- --entry-point in subfolder does not work HOT 7
- Add support for local builds
- Add a build flag on deploy
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from amazon-emr-cli.