Code Monkey home page Code Monkey logo

mse's Introduction

Tests Passing Build Status Maven Central PyPI version shields.io License

Make Structs Easy (MSE)

This library adds withField, withFieldRenamed, and dropFields methods to the Column class allowing users to easily add, rename, and drop fields inside of StructType columns. The signature and behaviour of these methods is intended to be similar to their Dataset equivalents, namely the withColumn, withColumnRenamed, and drop methods.

The methods themselves are backed by efficient Catalyst Expressions and as a result, should provide better performance than equivalent UDFs. While this library uses Scala's implicit conversion technique to "monkey patch" the methods on to the Column class, there is an on-going effort to add these methods natively to the Column class in the Apache Spark SQL project. You can follow along with the progress of this initiative in SPARK-22231.

If you find this project useful, please consider supporting it by giving a star!

Supported Spark versions

MSE should work without any further requirements on Spark/PySpark 2.4.x. The library is available for both Scala versions 2.11 and 2.12. The library is available for Python 3.x.

Installation

Scala

Stable releases of MSE are published to Maven Central. As such, you can pull in the current stable release by simply adding a library dependency to your project for the correct version. For example, for an SBT project, simply add the following line to your build.sbt:

libraryDependencies += "com.github.fqaiser94" %% "mse" % "0.2.4"

For other types of projects (e.g. Maven, Gradle), see the installation instructions at this link.

Python

Stable releases of MSE are published to PyPi. You will also need to provide your PySpark application/s with the path to the MSE jar which you can get from here. For example:

pip install mse
curl https://repo1.maven.org/maven2/com/github/fqaiser94/mse_2.11/0.2.4/mse_2.11-0.2.4.jar --output mse.jar
pyspark --jars mse.jar

If you get errors like TypeError: 'JavaPackage' object is not callable, this usually indicates that you haven't provided PySpark with the correct path to the MSE jar.

Usage

To bring in to scope the (implicit) Column methods in Scala, use:

import com.github.fqaiser94.mse.methods._

To bring in to scope the (implicit) Column methods in Python, use:

from mse import *

The rest of the example code shown below is written in Scala although equivalent Python code would look very similar.

You can now use these methods to manipulate fields in a top-level StructType column:

import org.apache.spark.sql._
import org.apache.spark.sql.types._

// Generate some example data
val structLevel1 = spark.createDataFrame(sc.parallelize(
  Row(Row(1, null, 3)) :: Nil),
  StructType(Seq(
    StructField("a", StructType(Seq(
      StructField("a", IntegerType),
      StructField("b", IntegerType),
      StructField("c", IntegerType))))))).cache
      
structLevel1.show
// +-------+                                                                       
// |      a|
// +-------+
// |[1,, 3]|
// +-------+

structLevel1.printSchema
// root
//  |-- a: struct (nullable = true)
//  |    |-- a: integer (nullable = true)
//  |    |-- b: integer (nullable = true)
//  |    |-- c: integer (nullable = true)

// add new field to top level struct
structLevel1.withColumn("a", 'a.withField("d", lit(4))).show
// +----------+
// |         a|
// +----------+
// |[1,, 3, 4]|
// +----------+

// replace field in top level struct
structLevel1.withColumn("a", 'a.withField("b", lit(2))).show
// +---------+
// |        a|
// +---------+
// |[1, 2, 3]|
// +---------+

// rename field in top level struct
structLevel1.withColumn("a", 'a.withFieldRenamed("b", "z")).printSchema
// root
//  |-- a: struct (nullable = true)
//  |    |-- a: integer (nullable = true)
//  |    |-- z: integer (nullable = true)
//  |    |-- c: integer (nullable = true)

// drop field in top level struct
structLevel1.withColumn("a", 'a.dropFields("b")).show
// +------+
// |     a|
// +------+
// |[1, 3]|
// +------+

You can also use these methods to manipulate fields in nested StructType columns:

// Generate some example data  
val structLevel2 = spark.createDataFrame(sc.parallelize(
    Row(Row(Row(1, null, 3))) :: Nil),
    StructType(Seq(
      StructField("a", StructType(Seq(
        StructField("a", StructType(Seq(
          StructField("a", IntegerType),
          StructField("b", IntegerType),
          StructField("c", IntegerType)))))))))).cache
          
structLevel2.show
// +---------+
// |        a|
// +---------+
// |[[1,, 3]]|
// +---------+

structLevel2.printSchema
// |-- a: struct (nullable = true)
// |    |-- a: struct (nullable = true)
// |    |    |-- a: integer (nullable = true)
// |    |    |-- b: integer (nullable = true)
// |    |    |-- c: integer (nullable = true)

// add new field to nested struct
structLevel2.withColumn("a", 'a.withField(
  "a", $"a.a".withField("d", lit(4)))).show
// +------------+
// |           a|
// +------------+
// |[[1,, 3, 4]]|
// +------------+

// replace field in nested struct
structLevel2.withColumn("a", $"a".withField(
  "a", $"a.a".withField("b", lit(2)))).show
// +-----------+
// |          a|
// +-----------+
// |[[1, 2, 3]]|
// +-----------+
    
// rename field in nested struct
structLevel2.withColumn("a", 'a.withField(
  "a", $"a.a".withFieldRenamed("b", "z"))).printSchema
// |-- a: struct (nullable = true)
// |    |-- a: struct (nullable = true)
// |    |    |-- a: integer (nullable = true)
// |    |    |-- z: integer (nullable = true)
// |    |    |-- c: integer (nullable = true)
    
// drop field in nested struct
structLevel2.withColumn("a", 'a.withField(
  "a", $"a.a".dropFields("b"))).show
// +--------+
// |       a|
// +--------+
// |[[1, 3]]|
// +--------+

You can also manipulate deeply nested StructType columns using the aforementioned patterns but it can be a little annoying to write out the full chain. For this scenario, this library also provides a helper method, namely add_struct_field. You can use this method to add, rename, and drop deeply nested fields as shown below:

// Generate some example data  
val structLevel3 = spark.createDataFrame(sc.parallelize(
    Row(Row(Row(Row(1, null, 3)))) :: Nil),
    StructType(Seq(
      StructField("a", StructType(Seq(
        StructField("a", StructType(Seq(
          StructField("a", StructType(Seq(
              StructField("a", IntegerType),
              StructField("b", IntegerType),
              StructField("c", IntegerType))))))))))))).cache
              
structLevel3.show
//+-----------+
//|          a|
//+-----------+
//|[[[1,, 3]]]|
//+-----------+

structLevel3.printSchema
//root
// |-- a: struct (nullable = true)
// |    |-- a: struct (nullable = true)
// |    |    |-- a: struct (nullable = true)
// |    |    |    |-- a: integer (nullable = true)
// |    |    |    |-- b: integer (nullable = true)
// |    |    |    |-- c: integer (nullable = true)

// add new field to deeply nested struct
structLevel3.withColumn("a", add_struct_field("a.a.a", "d", lit(4))).show
// +--------------+                                                                
// |             a|
// +--------------+
// |[[[1,, 3, 4]]]|
// +--------------+

// replace field in deeply nested struct
structLevel3.withColumn("a", add_struct_field("a.a.a", "b", lit(2))).show
// +-------------+
// |            a|
// +-------------+
// |[[[1, 2, 3]]]|
// +-------------+
    
// rename field in deeply nested struct
structLevel3.withColumn("a", add_struct_field("a.a", "a", $"a.a.a".withFieldRenamed("b", "z"))).printSchema
// root
//  |-- a: struct (nullable = true)
//  |    |-- a: struct (nullable = true)
//  |    |    |-- a: struct (nullable = true)
//  |    |    |    |-- a: integer (nullable = true)
//  |    |    |    |-- z: integer (nullable = true)
//  |    |    |    |-- c: integer (nullable = true)

// drop field in deeply nested struct
structLevel3.withColumn("a", add_struct_field("a.a", "a", $"a.a.a".dropFields("b"))).show
// +----------+
// |         a|
// +----------+
// |[[[1, 3]]]|
// +----------+

// add, rename, and drop fields in deeply nested struct
val result = structLevel3.withColumn("a", add_struct_field("a.a", "a", $"a.a.a".dropFields("b").withFieldRenamed("c", "b").withField("c", lit(4))))
result.show
// +-------------+
// |            a|
// +-------------+
// |[[[1, 3, 4]]]|
// +-------------+

result.printSchema
// root
//  |-- a: struct (nullable = true)
//  |    |-- a: struct (nullable = true)
//  |    |    |-- a: struct (nullable = true)
//  |    |    |    |-- a: integer (nullable = true)
//  |    |    |    |-- b: integer (nullable = true)
//  |    |    |    |-- c: integer (nullable = false)

Another common use-case is to perform these operations on arrays of structs. To do this using the Scala APIs, we recommend combining the functions in this library with the functions provided in spark-hofs:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import za.co.absa.spark.hofs._
import com.github.fqaiser94.mse.methods._

// Generate some example data
val arrayOfStructs = spark.createDataFrame(sc.parallelize(
    Row(List(Row(1, null, 3), Row(4, null, 6))) :: Nil),
    StructType(Seq(
      StructField("array", ArrayType(
        StructType(Seq(
          StructField("a", IntegerType),
          StructField("b", IntegerType), 
          StructField("c", IntegerType)))))))).cache
          
arrayOfStructs.show
// +------------------+
// |             array|
// +------------------+
// |[[1,, 3], [4,, 6]]|
// +------------------+

arrayOfStructs.printSchema
// root
//  |-- array: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- a: integer (nullable = true)
//  |    |    |-- b: integer (nullable = true)
//  |    |    |-- c: integer (nullable = true)

// add new field to each struct element of array 
arrayOfStructs.withColumn("array", transform($"array", elem => elem.withField("d", lit("hello")))).show(false)
// +--------------------------------+
// |array                           |
// +--------------------------------+
// |[[1,, 3, hello], [4,, 6, hello]]|
// +--------------------------------+

// replace field in each struct element of array
arrayOfStructs.withColumn("array", transform($"array", elem => elem.withField("b", elem.getField("a") + 1))).show(false)
// +----------------------+
// |array                 |
// +----------------------+
// |[[1, 2, 3], [4, 5, 6]]|
// +----------------------+

// rename field in each struct element of array
arrayOfStructs.withColumn("array", transform($"array", elem => elem.withFieldRenamed("b", "z"))).printSchema
// root
//  |-- array: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- a: integer (nullable = true)
//  |    |    |-- z: integer (nullable = true)
//  |    |    |-- c: integer (nullable = true)

// drop field in each Struct element of array
arrayOfStructs.withColumn("array", transform($"array", elem => elem.dropFields("b"))).show(false)
// +----------------+
// |array           |
// +----------------+
// |[[1, 3], [4, 6]]|
// +----------------+

SQL installation and usage

The underlying Catalyst Expressions are SQL compatible. Unfortunately, Spark only added public APIs for plugging in custom Catalyst Expressions into the FunctionRegistry in Spark 3.0.0 (which is at the time of writing is still in preview). You can find a project with an example of how to do this here.

Catalyst Optimization Rules

We also provide some Catalyst optimization rules that can be plugged into a Spark session to get even better performance. This is as simple as including the following two lines of code at the start of your Scala Spark program:

import org.apache.spark.sql.catalyst.optimizer.SimplifyStructExpressions
spark.experimental.extraOptimizations = SimplifyStructExpressions.rules

Spark will use these optimization rules to internally rewrite queries in a more optimal fashion. For example, consider the following query and its corresponding physical plan:

val query = structLevel1.withColumn("a", 'a.withField("d", lit(4)).withField("e", lit(5)))

query.explain
// == Physical Plan ==
// *(1) Project [add_fields(add_fields(a#1, d, 4), e, 5) AS a#32343]
// +- InMemoryTableScan [a#1]
//       +- InMemoryRelation [a#1], StorageLevel(disk, memory, deserialized, 1 replicas)
//             +- Scan ExistingRDD[a#1]

If we add the SimplifyStructExpressions.rules to our Spark session, we see a slightly different physical plan for the same query:

import org.apache.spark.sql.catalyst.optimizer.SimplifyStructExpressions
spark.experimental.extraOptimizations = SimplifyStructExpressions.rules

query.explain
// == Physical Plan ==
// *(1) Project [add_fields(a#1, d, e, 4, 5) AS a#32343]
// +- InMemoryTableScan [a#1]
//       +- InMemoryRelation [a#1], StorageLevel(disk, memory, deserialized, 1 replicas)
//             +- Scan ExistingRDD[a#1]

As you can see, the successive add_fields method calls have been collapsed into a single add_fields method call.

Theoretically, this should improve performance but for the most part, you won't notice much difference unless you're doing some particularly intense struct manipulation and/or working with a particularly large dataset.

Unfortunately, to the best of our knowledge, there is currently no way to plug in custom Catalyst optimization rules directly using the Python APIs.

Questions/Thoughts/Concerns?

Feel free to submit an issue.

Instructions for deploying a new release

Increment version number in python/setup.py
Create a new release with appropriately incremented tag

mse's People

Contributors

fqaiser94 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

mse's Issues

Cannot operate on a struct inside an array

Consider the following spark-shell session, which has loaded the latest published version of both this library, and spark-hofs.

import com.github.fqaiser94.mse.methods._
import za.co.absa.spark.hofs._
import spark.implicits._

val jsonText = """{
     |   "data1": [
     |     {
     |       "vlan": {
     |         "id": "195",
     |         "name": "Subnet-54.14.195"
     |       }
     |     },
     |     {
     |       "vlan": {
     |         "id": "195",
     |         "name": "Subnet-54.14.193"
     |       }
     |     }
     |   ]
     | }""".stripMargin

val df = spark.read.option("multiline", "true").json(Seq(jsonText).toDS())

# this works fine
df.withColumn("data1", transform($"data1", col => col.withFieldRenamed("vlan", "vlan_renamed"))).printSchema

# but what if I want to rename "name" to "device_name" ?
# I thought it would be something like this
df.withColumn("data1", transform($"data1", col => col.withField("vlan", $"data1.vlan".withFieldRenamed("name", "device_name")))).printSchema
# but it fails with:
org.apache.spark.sql.AnalysisException: cannot resolve 'rename_fields(`data1`.`vlan`, 'name', 'device_name')' due to data type mismatch: Only struct is allowed to appear at first position, got: array.;;
'Project [transform(data1#44, lambdafunction(add_fields(lambda elm#58, vlan, rename_fields(data1#44.vlan, name, device_name)), lambda elm#58, false)) AS data1#57]
+- LogicalRDD [data1#44], false

I believe that the problem might be here. If this is an array, then we should descend into the elementType to see if it's a StructType. Does this seem on the right track, or am I off the mark?

SyntaxError: invalid syntax

Hello All,
I am using spark-submit to submit a job by modifying a nested struct column by adding a new field using withField()

Following is the spark-submit command:

spark-submit --master local --packages com.github.fqaiser94:mse_2.11:0.2.4 --py-files mse.zip write_data.py

I have ziped mse, since I dont want to install it globally and also I am going to execute it in AWS EMR.

I am getting following error:

Traceback (most recent call last):
File "/Users/fkj/DataLakeManagement/archive/write_data.py", line 3, in
from mse import *
File "/Users/fkj/DataLakeManagement/archive/mse.zip/mse/init.py", line 1, in
File "/Users/fkj/DataLakeManagement/archive/mse.zip/mse/methods.py", line 5
def __withField(self: Column, fieldName: str, fieldValue: Column):
^
SyntaxError: invalid syntax

Following is my mse.zip:

mse.zip

write_data.py:

from datetime import datetime

from mse import *

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StringType, TimestampType, IntegerType


def spark():
    """
    This function is invoked to create the Spark Session.

    :return: the spark session
    """
    spark_session = (SparkSession
                     .builder
                     .appName("Data_Experimentation_Framework")
                     .getOrCreate())

    spark_session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
    return spark_session


address = StructType().add("state", StringType())
schema = StructType().add("_id", IntegerType()) \
    .add("employer", StringType()) \
    .add("created_at", TimestampType()) \
    .add("name", StringType())\
    .add("address", address)

employees = [{'_id': 1,
              'employer': 'Microsoft',
              'created_at': datetime.now(),
              'name': 'Noel',
              'address': {
                  "state": "Pennsylvania"
              }

              },
             {'_id': 2,
              'employer': 'Apple',
              'created_at': datetime.now(),
              'name': 'Steve',
              'address': {
                  "state": "New York"
              }
              }
             ]
df = spark().createDataFrame(employees, schema=schema)

df.withColumn("address", f.col("address").withField("country", f.lit("USA")))

df.printSchema()

df.write \
    .format("parquet") \
    .mode("append") \
    .save("/Users/felixkizhakkeljose/Downloads/test12")

Could you help me to identify what am I doing wrong?

Availability for Spark 3.x

I absolutely love this package! I'm far from being a Spark/Scala pro but it makes my live so much easier working with jsons and transforming them.
However, executing it with Spark 3.1 and Scala 2.12 I am getting the following error:
java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
How can I make it available for Spark 3.x? Thx!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.