Code Monkey home page Code Monkey logo

sparkgis's Introduction

SparkGIS

SparkGIS adds GIS functionalities to SparkSQL through:

  • a user-defined type (UDT): GeometryType
  • a class representing values of such type: Geometry
  • a set of user-defined Functions (UDF) and predicates operating on one or two Geometry values

Creating Geometry values

To create values, use factory methods in the Geometry object:

  import Geometry.WGS84
  val pt = Geometry.point(12.5, 14.6)
  val ln = Geometry.line((0,0), (20,50))
  val collection = Geometry.collection(pt, ln)

Each factory method has two argument lists:

  • the first one is the set of 2D coordinates describing the geometry (one, many, many sequences, depending on the geometry type)
  • the second one is the coordinate reference system id; an implicit value Geometry.WGS84 is provided for this

You can also create Geometry values from WKB (Well Known Binary), WKT (Well Known Text), and GeoJSON formats:

  val mp = Geometry.fromString("MULTIPOINT ((1 1), (2 2))")
  val ml = Geometry.fromGeoJSON("{\"type\":\"MultiLineString\",\"coordinates\":[[[12,13],[15,20]],[[7,9],[11,17]]]}}")

Defining table schemas

Simply use the GeometryType instance as a type:

  val schema = StructType(Seq(
    StructField("id", IntegerType),
    StructField("geo", GeometryType.Instance)
  ))	

Creating RDDs

The GeometryType is able to produce Geometry values from any supported serialization format ("WKB, WKT, GeoJSON) as well as from schema-less JSON RDDs. So simply load your data and apply the schema as shown below:

  // using GeoJSON
  val data = Seq(
    "{\"id\":1,\"geo\":{\"type\":\"Point\",\"coordinates\":[1,1]}}",
    "{\"id\":2,\"geo\":{\"type\":\"LineString\",\"coordinates\":[[12,13],[15,20]]}}",
    "{\"id\":3,\"geo\":{\"type\":\"MultiLineString\",\"coordinates\":[[[12,13],[15,20]],[[7,9],[11,17]]]}}",
    ...
  )
  val rdd = sc.parallelize(data)
  val df = sqlContext.jsonRDD(rdd, schema)

  // or other means
  val data = Seq(
    Row(1, Geometry.point(1,1)),
    Row(2, Geometry.fromString("MULTIPOINT ((1 1), (2 2))"),
    ...
  )
  val rdd = sc.parallelize(data)
  val df = sqlContext.createDataFrame(rdd, schema)

Using functions

Each function is defined as a method of the Functions object and can be used freely in any suitable context. Moreover, they can be registered in the SQLContext and used inside SparkSQL queries:

  Functions.register(sqlContext)
  df.registerTempTable("features")
  result = sqlContext.sql("SELECT ST_Length(geo) FROM features")

Using geometry methods

GIS functions are just aliasing methods from the class hierarchy rooting at GisGeometry. GisGeometry hierarchy wraps GeoTools classes to provide a consistent interface - like returning options instead of magic values or to model the absence of some property for some geometry type.

An instance of GisGeometry is wrapped by the SparkSQL Geometry type; the easiest way to access it and invoke its methods is by importing Geometry.ImplicitConversions:

  import Geometry.ImplicitConversions._

  val l = Geometry.line((10.0,10.0), ...)
  if (!l.isEmpty) {
    val p: Option[Geometry] = l.startPoint
    ....
  }

Some method is also aliased as operator by GeometryOperators implicit class:

  import GeometryOperators._
  import Geometry.ImplicitConversions._

  val l1 = Geometry.line((10.0,10.0), (20.0,20.0), (10.0,30.0))
  val l2 = Geometry.line((20.0,20.0), (30.0,30.0), (40.0,40.0))
  if ((l1 <-> l2) < 50.0) { // distance less than 50
    ...
  }

Build, test and doc

The project uses Maven as build system, so you should be comfortable with it. If not, install Maven 3, cd in your SparkGIS directory and

  mvn package -DskipTests
  mvn test
  mvn scala:doc

You'll find the jar under the target directory, have run all available tests, and generated the documentation under target/site/scaladocs.

Credits

The Geometry value class is written on top of the GeoTools library.

UDFs aim to adhere to OGC Simple Feature Access recommendation. Name and documentation of GIS functions have been copied from PostGIS.

Remarks

In order to work within jsonRDDs, Spark >= 1.4 is needed.

Changelog

0.3.0

  • Abandoned ESRI Geometry library in favor of GeoTools
  • Moved to Scala 2.11
  • Moved to Spark 1.6
  • Added ST_Perimeter
  • Added tolerance argument to ST_Simplify
  • SRID in factory methods is now an implicit argument

sparkgis's People

Contributors

drubbo avatar estoianovici avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sparkgis's Issues

java.lang.NullPointerException on ST_Buffer

I got this Exception:

16/04/06 03:49:09 INFO GenerateUnsafeProjection: Code generated in 190.332219 ms
16/04/06 03:49:10 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NullPointerException
at org.betterers.spark.gis.GisGeometry$ImplicitConversions$.fromGeometry(GisGeometry.scala:389)
at org.betterers.spark.gis.Geometry$ImplicitConversions$.toImpl(Geometry.scala:94)
at org.betterers.spark.gis.udf.Functions$.ST_Buffer(Functions.scala:232)
at SparkGis$$anonfun$1.apply(SparkGis.scala:42)
at SparkGis$$anonfun$1.apply(SparkGis.scala:42)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

When i run this piece of code:

import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.betterers.spark.gis._
import org.betterers.spark.gis.udf.Functions

object SparkGis {
  val conf = new SparkConf()
  .setAppName("Spark Gis Test")
  .setMaster("local[*]")

  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  def main (args: Array[String]) {
    val layerSchema = StructType(Seq(StructField("id", LongType),
                                      StructField("geometry", GeometryType.Instance)))
    val polygons =
    sqlContext.read.format("json")
    .schema(layerSchema)
    .load("data/countries.json")

    val points =
      sqlContext.read.format("json")
        .schema(layerSchema)
        .load("data/points.json")

    bufferDistanceExample(polygons, points)
  }

  def bufferDistanceExample(poly:DataFrame, points:DataFrame): Unit={
    poly.registerTempTable("polygons")
    val pointsBuffer = poly
      .map(c => Functions.ST_Buffer(c.getAs[Geometry](1), 10))

    pointsBuffer.foreach(println)

    val pointsBufferRow = pointsBuffer
    .map(x => org.apache.spark.sql.Row(x))

    pointsBufferRow.foreach(println)

    val pointsBufferDf =sqlContext
    .createDataFrame(pointsBufferRow, StructType(Seq(StructField("geometry", GeometryType.Instance))))

    pointsBufferDf.show()

    pointsBufferDf.registerTempTable("pointsBuffer")
    val bufferJoin = pointsBufferDf.sqlContext.sql("select p.geometry, pol.geometry from pointsBuffer p join polygons pol")
    bufferJoin.show()
  }
}

with this data:
data.zip

I can see the geometries, but when i tried to do join between them and print the result i got this exception.
When i ran it without the buffer section it works.
The data is valid, because before the join i can print the features of the layers well.
In Addition, i compiled the package(SparkGIS) to scala 2.10.5 instead of using scala 2.11 (All the tests pass)

Resubmit to Spark Packages

Hi,

Thank you very much for your submission to Spark Packages! Due to an internal error, your process wasn't completed and your submission didn't go through. We fixed the problem now, could you please resubmit the package?

Thanks,
Burak

Better GeometryType.deserialize

When deserializing from Map, we create a GeoJSON and deserialize it back to geometry.
We can do better looking for particular keys and values.

[Spark 1.6.0]ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String

I read two geojson files(lines and points) as dataframes,
then i created buffer dataframe from the points.
I can see the geometries, but when i tried to do join between them and print the result i got this exception.
The data is valid, because before the join i can print the features of the layers well.
i tried to add option("charset", "UTF8") and also UTF16.
I saw this error in other package:
https://github.com/databricks/spark-csv/issues/206
https://github.com/databricks/spark-csv/issues/64

I am running it on windows, but connect to remote HDFS.
In Addition, i compiled the package(SparkGIS) to scala 2.10.5 instead of using scala 2.11 (All the tests pass)

Comparability for Spark 2.X

It would be nice to create a version that works with Spark 2.X

I tried to make it works, but it's not compile, because in Spark 2.X the UserDefinedType is private.

Check this: SPARK-7768

Built-in functions instead of / alongside UDFs

The SparkSQL parser can use a fallback parser when it's not able to recognize parts of the query.
We can leverage this and provide functions as built-in instead of UDF - performances should improve.

Extract geometry operators into separate library

Came across this project not looking for spark, but for a scala wrapper / library for geometric operations.

Would it make sense to separate the geometry library from the spark integration (UDF, ...) so it could be used separately?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.