Code Monkey home page Code Monkey logo

spark-hats's Introduction

spark-hats

Build FOSSA Status

Spark "Helpers for Array Transformations"

This library extends Spark DataFrame API with helpers for transforming fields inside nested structures and arrays of arbitrary levels of nesting.

Usage

Reference the library

Scala 2.11Scala 2.12Scala 2.13
Maven Central
Maven Central
Maven Central
groupId: za.co.absa
artifactId: spark-hats_2.11
version: 0.3.0
groupId: za.co.absa
artifactId: spark-hats_2.12
version: 0.3.0
groupId: za.co.absa
artifactId: spark-hats_2.13
version: 0.3.0

Please, use the table below to determine what version of spark-hats to use for Spark compatibility.

spark-hats version Scala version Spark version
0.1.x 2.11, 2.12 2.4.3+
0.2.x 2.11, 2.12 2.4.3+
0.2.x 2.12 3.0.0+
0.3.x 2.11 2.4.3+
0.3.x 2.12, 2.13 3.2.1+

To use the extensions you need to add this import to your Spark application or shell:

import za.co.absa.spark.hats.Extensions._

How to generate Code coverage report

sbt ++{matrix.scala} jacoco -DSPARK_VERSION={matrix.spark}

Code coverage will be generated on path:

{project-root}/spark-hats/target/scala-{scala_version}/jacoco/report/html

Motivation

Here is a small example we will use to show you how spark-hats work. The important thing is that the dataframe contains an array of struct fields.

scala> df.printSchema()
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
       
scala> df.show(false)
+---+------------------------------+
|id |my_array                      |
+---+------------------------------+
|1  |[[1, foo]]                    |
|2  |[[1, bar], [2, baz], [3, foz]]|
+---+------------------------------+

Now, say, we want to add a field c as part of the struct alongside a and b from the example above. The expression for c is c = a + 1.

Here is the code you can use in Spark:

    val dfOut = df.select(col("id"), transform(col("my_array"), c => {
      struct(c.getField("a").as("a"),
        c.getField("b").as("b"),
        (c.getField("a") + 1).as("c"))
    }).as("my_array"))

(to use transform() in Scala API you need to add spark-hofs as a dependency).

Here is how it looks when using spark-hats library.

    val dfOut = df.nestedMapColumn("my_array.a","c", a => a + 1)

Both produce the following results:

scala> dfOut.printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: long (nullable = true)

scala> dfOut.show(false)
+---+---------------------------------------+
|id |my_array                               |
+---+---------------------------------------+
|1  |[[1, foo, 2]]                          |
|2  |[[1, bar, 2], [2, baz, 3], [3, foz, 4]]|
+---+---------------------------------------+

Imagine how the code will look like for more levels of array nesting.

Methods

Add a column

The nestedWithColumn method allows adding new fields inside nested structures and arrays.

The addition of a column API is provided in two flavors: the basic and the extended API. The basic API is simpler to use, but the expressions it expects can only reference columns at the root of the schema. Here is an example of the basic add column API:

scala> df.nestedWithColumn("my_array.c", lit("hello")).printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = false)

scala> df.nestedWithColumn("my_array.c", lit("hello")).show(false)
+---+---------------------------------------------------+
|id |my_array                                           |
+---+---------------------------------------------------+
|1  |[[1, foo, hello]]                                  |
|2  |[[1, bar, hello], [2, baz, hello], [3, foz, hello]]|
+---+---------------------------------------------------+

Add column (extended)

The extended API method nestedWithColumnExtended works similarly to the basic one but allows the caller to reference other array elements, possibly on different levels of nesting. The way it allows this is a little tricky. The second parameter is changed from being a column to a function that returns a column. Moreover, this function has an argument which is a function itself, the getField() function. The getField() function can be used in the transformation to reference other columns in the dataframe by their fully qualified name.

In the following example, a transformation adds a new field my_array.c to the dataframe by concatenating a root level column id with a nested field my_array.b:

scala> val dfOut = df.nestedWithColumnExtended("my_array.c", getField =>
         concat(getField("id").cast("string"), getField("my_array.b"))
       )

scala> dfOut.printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)

scala> dfOut.show(false)
+---+------------------------------------------------+
|id |my_array                                        |
+---+------------------------------------------------+
|1  |[[1, foo, 1foo]]                                |
|2  |[[1, bar, 2bar], [2, baz, 2baz], [3, foz, 2foz]]|
+---+------------------------------------------------+
  • Note. You can still use col to reference root level columns. But if a column is inside an array (like my_array.b), invoking col("my_array.b") will reference the whole array, not an individual element. The getField() function that is passed to the transformation solves this by adding a generic way of addressing array elements on arbitrary levels of nesting.

  • Advanced Note. If there are several arrays in the schema, getField() allows to reference elements of an array if it is one of the parents of the output column.

Drop a column

The nestedDropColumn method allows dropping fields inside nested structures and arrays.

scala> df.nestedDropColumn("my_array.b").printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)

scala> df.nestedDropColumn("my_array.b").show(false)
+---+---------------+
|id |my_array       |
+---+---------------+
|1  |[[1]]          |
|2  |[[1], [2], [3]]|
+---+---------------+

Map a column

The nestedMapColumn method applies a transformation on a nested field. If the input column is a primitive field the method will add outputColumnName at the same level of nesting. If a struct column is expected you can use .getField(...) method to operate on its children.

The output column name can omit the full path as the field will be created at the same level of nesting as the input column.

scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: long (nullable = true)

scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).show(false)
+---+---------------------------------------+
|id |my_array                               |
+---+---------------------------------------+
|1  |[[1, foo, 2]]                          |
|2  |[[1, bar, 2], [2, baz, 3], [3, foz, 4]]|
+---+---------------------------------------+

Other transformations

Unstruct

Syntax: df.nestedUnstruct("NestedStructColumnName").

Flattens one level of nesting when a struct is nested in another struct. For example,

scala> df.printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- a: long (nullable = true)
|    |    |-- b: string (nullable = true)
|    |    |-- c: struct (containsNull = true)
|    |    |    |--nestedField1: string (nullable = true)
|    |    |    |--nestedField2: long (nullable = true)

scala> df.nestedUnstruct("my_array.c").printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- a: long (nullable = true)
|    |    |-- b: string (nullable = true)
|    |    |-- nestedField1: string (nullable = true)
|    |    |-- nestedField2: long (nullable = true)

Note that the output schema doesn't have the c struct. All fields of c are now part of the parent struct.

Changelog

  • 0.3.0 released 3 August 2023.

    • #38 Add scala 2.13 support.
    • #33 Update spark test to 3.2.1.
    • #35 Add code coverage support.
  • 0.2.2 released 8 March 2021.

    • #23 Added nestedUnstruct() method that flattens one level of nesting for a given struct.
  • 0.2.1 released 21 January 2020.

    • #10 Fixed error column aggregation when the input array is null.
  • 0.2.0 released 16 January 2020.

    • #5 Added the extended nested transformation API that allows referencing arbitrary columns.

License

FOSSA Status

spark-hats's People

Contributors

dwfchu avatar fossabot avatar miroslavpojer avatar raffael-dzikowski avatar yruslan avatar zejnilovic avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-hats's Issues

Add code coverage support

Background

Add code coverage support to be able measure code quality.

Feature

New ability to measure current code coverage as one of QA metrics.

Update:

  • Improve or add code coverage support to be able measure code quality.
  • New ability to measure current code coverage as one of QA metrics.
  • Add GH action to check changed file coverage.

Support nested Map transformations

Currently, when encountered a nested Map the following error is given:

java.lang.IllegalArgumentException: Field 'someNestedMap' is not a struct type or an array.

Would be a nice to have improvement. Are there plans to introduce this feature?

Support dataframes that have maps

Background

Currently, spark-hats does not support transformations inside nested maps (see #15). But at least it should allow processing Data Frames that contain maps if the transformations are happening outside nested maps.

Feature

Allow applying transformations on nested structs and arrays outside of a Map when the Data Frame contains a map.

Add Unstruct functionality to flatten a nested struct

Background

Currently, there is no way to flatten a struct field in a certain level of nesting.

Feature

When doing f.nestedMapColumn(), the unstruct function should project the fields of a nested struct on the same level as the parent

Example

For a dataset of the following shape:

root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- a: long (nullable = true)
|    |    |-- b: string (nullable = true)
|    |    |-- c: struct (containsNull = true)
|    |    |    |--nestedField1: string (nullable = true)
|    |    |    |--nestedField2: long (nullable = true)

Applying df.nestedMapColumn("my_array.c", "my_array", c => unstruct(c)) should result in

root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- a: long (nullable = true)
|    |    |-- b: string (nullable = true)
|    |    |-- nestedField1: string (nullable = true)
|    |    |-- nestedField2: long (nullable = true)

Errors are not retained if the input array is null

Background

If a processed array is null, all existing errors in the error column are removed.

Expected behavior

The API that supports adding errors to the error column should retain the list of errors for each record that is already there.

Select an element from an array of arrays

Background

If you have an array of arrays select doesn't work

Feature

Be able to select a column from array of arrays

Example [Optional]

scala> res0.printSchema
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- qty: integer (nullable = false)
 |    |    |-- price: double (nullable = false)
 |    |    |-- payments: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- payid: string (nullable = true)
 |    |    |    |    |-- amount: double (nullable = false)

scala> res0.selectFromArray(col("items.payments.payid"))

UDF "arrayDistinctErrors" is called but not registered

In the NestedArrayTransformations.scala UDF "arrayDistinctErrors" is called but it's not registered anywhere in the library (To be precise it is registered but only the test part of the code.)
Therefore the library relies on the user app to define and register the function.

Add advanced nested routines to the extensions

Background

Currently, the DataFrame extensions contain only basic nested routines.

Feature

Add routines that work with the error column as DataFrame extensions as well. Put these routines into a separate implicit class so that advanced extensions need to be explicitly turned on to use.

Update. (minor) Add an explanation for the 'hats' acronym.

Multiple calls of nestedMapColumn cause dataframe to hang

Describe the bug
Hi there, I am using nestedMapColumn for multiple nested columns in a dataframe. Any native method I use on the dataframe after multiple uses of nestedMapColumn will hang like .show(), .write(), .select()...etc. When I say hang I mean it just blocks the application code when I use a native dataframe method. When I just use nestedMapColumn once on the dataframe, it works fine. Is this behavior expected? Is there any workaround?

I am using: libraryDependencies += "za.co.absa" %% "spark-hats" % "0.2.2"

To Reproduce
Steps to reproduce the behavior:

  1. Have a dataframe with multiple nested fields.
  2. Apply the nestedMapColumn on multiple fields.
  3. Try to run outputDataframe.show(), outputDataframe.select()...etc

Expected behavior
I expect the dataframe to not hang after using the nestedMapColumn multiple times.

Move extended array transformations from Enceladus to spark-hats

Background

Extended array transformations were added to Enceladus to support broadcast join on array elements so that join conditions could contain fields on all parent array levels.

Feature

Move extended array transformation from Enceladus to spark-hats and add Spark extension interface to it.

How to use spark-hats functions on a DataFrame in PySpark?

This looks like a super helpful extension for dealing with deeply nested fields in Spark. I'd love to see if it can help me with my problems, but I'm using PySpark in Python.

I think it's installing properly with:

from pyspark.sql.session import SparkSession
spark = (
    SparkSession.builder
    .config('spark.jars.packages', 'za.co.absa:spark-hats_2.12:0.2.2')
    .getOrCreate()
)

Since I see the following in the logs:

:: resolution report :: resolve 157ms :: artifacts dl 4ms
	:: modules in use:
	za.co.absa#spark-hats_2.12;0.2.2 from central in [default]
	za.co.absa#spark-hofs_2.12;0.4.0 from central in [default]

But then if I create a dataframe and try to access the functions, I'm not having success:

>>> empty_df = spark.createDataFrame([], schema="")
>>> empty_df.nestedWithColumn()
AttributeError: 'DataFrame' object has no attribute 'nestedWithColumn'
>>> empty_df._jdf.nestedWithColumn()
Py4JError: An error occurred while calling o63.nestedWithColumn. Trace:
py4j.Py4JException: Method nestedWithColumn([]) does not exist

So not sure if anyone has experience with PySpark here and has any insights. I'll also update this issue if I find a solution.

Add CI

Feature

Add a Jenkins file to run CI

Add schema projection

Feature

Create a method that given an input dataframe and the desired schema applies the schema to the dataframe as long as:

  • Same fields have same/compatible data types.
  • The desired schema is a subset of the input schema.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.