Code Monkey home page Code Monkey logo

osmesa's Introduction

OSMesa

Join the chat at https://gitter.im/osmesa/Lobby

This project is a collection of tools for working with OpenStreetMap (OSM). It is built to enable large scale batch analytic jobs to run on the latest OSM data, as well as streaming jobs which operate on updated with minutely replication files.

All command apps written to perform these batch and streaming jobs live in the apps subproject. Reusable components that can be used to construct your own batch and stream processors live in the analytics subproject.

Getting Started

This library is a toolkit meant to make the munging and manipulation of OSM data a simpler affair than it would otherwise be. Nevertheless, a significant degree of domain-specific knowledge is necessary to profitably work with OSM data. Prospective users would do well to study the OSM data-model and to develop an intuitive sense for how the various pieces of the project hang together to enable an open-source, globe-scale map of the world.

If you're already fairly comfortable with OSM's model, running one of the diagnostic (console printing/debugging) Spark Streaming applications provided in the apps subproject is probably the quickest way to explore Spark SQL and its usage within this library. To run the change stream processor application from the beginning of (OSM) time and until cluster failure or user termination, try this:

# head into the 'src' directory
cd src

# build the jar we'll be submitting to spark
sbt "project apps" assembly

# submit the streaming application to spark for process management
spark-submit \
  --class osmesa.apps.streaming.ChangeStreamProcessor \
  ./apps/target/scala-2.11/osmesa-apps.jar \
  --start-sequence 1

Deployment

Utilities are provided in the deployment directory to bring up cluster and enable you to push the OSMesa apps jar to that cluster. The spawned EMR cluster comes with Apache Zeppelin enabled, which allows jars to be registered/loaded for a console-like experience similar to Jupyter or IPython notebooks but which will execute spark jobs across the entire spark cluster. Actually wiring up Zeppelin to use OSMesa sources is beyond the scope of this document, but it is a relatively simple configuration.

Statistics

Summary statistics aggregated at the user and hashtag level that are supported by OSMesa:

  • Number of added buildings (building=*, version=1)
  • Number of modified buildings (building=*, version > 1 || minorVersion > 0)
  • Number of deleted buildings (building=*, visible == false)
  • Number of added roads (highway=*, version=1)
  • Number of modified roads (highway=*, version > 1 || minorVersion > 0)
  • Number of deleted roads (highway=*, visible == false)
  • km of added roads (highway=*, version=1)
  • km of modified roads (highway=*, version > 1 || minorVersion > 0)
  • km of deleted roads (highway=*, visible == false)
  • Number of added waterways (waterway={river,riverbank,canal,stream,stream_end,brook,drain,ditch,dam,weir,waterfall,pressurised}, version=1)
  • Number of modified waterways (waterway={river,riverbank,canal,stream,stream_end,brook,drain,ditch,dam,weir,waterfall,pressurised}, version > 1 || minorVersion > 0)
  • Number of deleted waterways (waterway={river,riverbank,canal,stream,stream_end,brook,drain,ditch,dam,weir,waterfall,pressurised}, version > 1 || minorVersion > 0)
  • km of added waterways (waterway=*, version=1)
  • km of modified waterways (waterway=*, version > 1 || minorVersion > 0)
  • km of deleted waterways (waterway=*, version > 1 || minorVersion > 0)
  • Number of added coastlines (natural=coastline, version=1)
  • Number of modified coastlines (natural=coastline, version > 1 || minorVersion > 0)
  • Number of deleted coastlines (natural=coastline, visible == false)
  • km of added coastline (natural=coastline, version=1)
  • km of modified coastline (natural=coastline, version > 1 || minorVersion > 0)
  • km of deleted coastline (natural=coastline, visible == false)
  • Number of added points of interest ({amenity,shop,craft,office,leisure,aeroway}=*, version=1)
  • Number of modified points of interest ({amenity,shop,craft,office,leisure,aeroway}=*, version > 1 || minorVersion > 0)
  • Number of deleted points of interest ({amenity,shop,craft,office,leisure,aeroway}*, visible == false)
  • Number of added "other" (not otherwise tracked, but tagged in OSM) features (not otherwise captured, version=1)
  • Number of modified "other" features (not otherwise captured, version > 1 || minorVersion > 0)
  • Number of deleted "other" features (not otherwise captured, visible == false)

SQL Tables

Statistics calculation, whether batch or streaming, updates a few tables that jointly can be used to discover user or hashtag stats. These are the schemas of the tables being updated.

These tables are fairly normalized and thus not the most efficient for directly serving statistics. If that's your goal, it might be useful to create materialized views for any further aggregation. A couple example queries that can serve as views are provided: hashtag_statistics and user_statistics

Batch

  • ChangesetStatsCreator will load aggregated statistics into a PostgreSQL database using JDBC.

  • MergeChangesets will update a changesets ORC file with the contents of the provided changeset stream.

Stream

Vector Tiles

Vector tiles, too, are generated in batch and via streaming so that a fresh set can be quickly generated and then kept up to date. Summary vector tiles are produced for two cases: to illustrate the scope of a user's contribution and to illustrate the scope of a hashtag/campaign within OSM

Batch

  • FootprintCreator produces a z/x/y stack of vector tiles corresponding to all changes marked with a given hashtag or user, depending on the CLI options provided.

Stream

  • HashtagFootprintUpdater updates a z/x/y stack of vector tiles corresponding to all changes marked with a given hashtag
  • UserFootprintUpdater updates a z/x/y stack of vector tiles which correspond to a user's modifications to OSM

osmesa's People

Contributors

cloudniner avatar echeipesh avatar fosskers avatar gitter-badger avatar guidorice avatar jamesmcclain avatar jpolchlo avatar lossyrob avatar mojodna avatar moradology avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

osmesa's Issues

Run stats updaters and watch for crashes

Driven by a) AugmentedDiffSource and b) ChangesetsSource.

Augmented diff-related crashes should be ameliorated by #76. Changeset-related crashes need to be investigated more.

User footprints may be inconsistent after updates

After running the footprint updaters as a batch (vs. streaming) process using data generated by #106, adjacent tiles appear to have inconsistent data. This may be a caching issue, but it warrants further investigation to determine whether there's a race condition that overwrites tile data incorrectly.

Augmented Diff Generation

Since #25 covers a number of different, intertwined pieces, I wanted to break out the augmented diff generation component.

Overview

Augmented diffs are intended to provide both elements that were actually modified during a time period (the timestamp attribute within the <new> element should be within the requested window (see Caveats below) as well as elements which refer to them. In the case of modifications (indirect or direct) to ways or relations, member (way nds or relation members) element metadata (coordinates, authorship, etc.) is inlined to avoid the need for additional lookups.

Output Format(s)

Row-based

The simplest format that likely meets our needs is row-based, such that it can be combined with the intermediate format containing reconstructed way geometries.

Fields needed are:

  • type
  • id
  • geom (reconstructed in the case of ways and relations)
  • version
  • minorVersion (this may not be necessary, as it can be re-calculated using a window function provided suitable updated values)
  • updated (direct or indirect timestamp)
  • changeset (changeset which directly or indirectly modified the element)
  • uid
  • user

Overpass Augmented OsmChange

For compatibility with Overpass-generated augmented diffs (which extend the OsmChange format, as used by minutely diffs, swapping the root <osmChange> for <osm>), OSMesa should match that format.

Overpass augmented diff sequences can be converted to Unix timestamps using <sequence> * 60 + 1347432900 (28019862018-01-10T02:41:00.000Z).

Example

Simplified version of sequence 2801986:

<?xml version="1.0" encoding="UTF-8"?>
<osm version="0.6" generator="Overpass API 0.7.54.13 ff15392f">
<note>The data included in this document is from www.openstreetmap.org. The data is made available under ODbL.</note>
<meta osm_base="2018-03-19T22:34:02Z"/>

<action type="modify">
<old>
  <node id="182611666" lat="38.5466070" lon="-75.7592310" version="2" timestamp="2009-09-29T12:01:08Z" changeset="2675490" uid="147510" user="woodpeck_fixbot"/>
</old>
<new>
  <node id="182611666" lat="38.5465913" lon="-75.7593007" version="3" timestamp="2018-01-10T02:41:50Z" changeset="55309654" uid="1231505" user="fortchagos"/>
</new>
</action>
<!-- ... -->
</action>
<action type="delete">
<old>
  <node id="3357186352" lat="-7.2006844" lon="113.2426484" version="1" timestamp="2015-02-18T03:36:36Z" changeset="28924003" uid="617220" user="raniedwianugrah"/>
</old>
<new>
  <node id="3357186352" visible="false" version="2" timestamp="2018-01-10T02:41:43Z" changeset="55309655" uid="2294556" user="anisa berliana"/>
</new>
</action>
<!-- ... -->
<action type="create">
  <node id="5330324023" lat="-27.4891386" lon="-58.7847889" version="1" timestamp="2018-01-10T02:41:36Z" changeset="55309653" uid="7435203" user="Mirkox30">
    <tag k="amenity" v="pharmacy"/>
    <tag k="name" v="Farmacia"/>
  </node>
</action>
<!-- ... -->
<action type="modify">
<old>
  <way id="17632925" version="2" timestamp="2014-11-30T18:13:33Z" changeset="27137865" uid="105946" user="ElliottPlack">
    <bounds minlat="38.5463900" minlon="-75.7608930" maxlat="38.5476470" maxlon="-75.7592310"/>
    <nd ref="182611666" lat="38.5466070" lon="-75.7592310"/>
    <nd ref="182611669" lat="38.5465580" lon="-75.7593610"/>
    <nd ref="182611672" lat="38.5465010" lon="-75.7594450"/>
    <nd ref="182611676" lat="38.5464360" lon="-75.7595060"/>
    <nd ref="182611679" lat="38.5463980" lon="-75.7595820"/>
    <nd ref="182611683" lat="38.5463900" lon="-75.7596350"/>
    <nd ref="182611687" lat="38.5464090" lon="-75.7597800"/>
    <nd ref="182611690" lat="38.5465390" lon="-75.7599710"/>
    <nd ref="182611694" lat="38.5468330" lon="-75.7602460"/>
    <nd ref="182611697" lat="38.5470580" lon="-75.7604820"/>
    <nd ref="182611699" lat="38.5471190" lon="-75.7605200"/>
    <nd ref="182611701" lat="38.5472940" lon="-75.7605660"/>
    <nd ref="182611702" lat="38.5475500" lon="-75.7606650"/>
    <nd ref="182611704" lat="38.5476070" lon="-75.7607260"/>
    <nd ref="182611707" lat="38.5476470" lon="-75.7608930"/>
    <tag k="tiger:reviewed" v="no"/>
    <tag k="highway" v="service"/>
    <tag k="service" v="driveway"/>
    <tag k="tiger:cfcc" v="A41"/>
    <tag k="tiger:county" v="Dorchester, MD"/>
  </way>
</old>
<new>
  <way id="17632925" version="3" timestamp="2018-01-10T02:41:51Z" changeset="55309654" uid="1231505" user="fortchagos">
    <bounds minlat="38.5463900" minlon="-75.7608930" maxlat="38.5476470" maxlon="-75.7593007"/>
    <nd ref="182611666" lat="38.5465913" lon="-75.7593007"/>
    <nd ref="182611669" lat="38.5465580" lon="-75.7593610"/>
    <nd ref="182611672" lat="38.5465010" lon="-75.7594450"/>
    <nd ref="182611676" lat="38.5464360" lon="-75.7595060"/>
    <nd ref="182611679" lat="38.5463980" lon="-75.7595820"/>
    <nd ref="182611683" lat="38.5463900" lon="-75.7596350"/>
    <nd ref="182611687" lat="38.5464090" lon="-75.7597800"/>
    <nd ref="182611690" lat="38.5465390" lon="-75.7599710"/>
    <nd ref="182611694" lat="38.5468330" lon="-75.7602460"/>
    <nd ref="182611697" lat="38.5470580" lon="-75.7604820"/>
    <nd ref="182611699" lat="38.5471190" lon="-75.7605200"/>
    <nd ref="182611701" lat="38.5472940" lon="-75.7605660"/>
    <nd ref="182611702" lat="38.5475500" lon="-75.7606650"/>
    <nd ref="182611704" lat="38.5476070" lon="-75.7607260"/>
    <nd ref="182611707" lat="38.5476470" lon="-75.7608930"/>
    <tag k="highway" v="service"/>
    <tag k="service" v="driveway"/>
    <tag k="tiger:cfcc" v="A41"/>
    <tag k="tiger:county" v="Dorchester, MD"/>
    <tag k="tiger:reviewed" v="yes"/>
  </way>
</new>
</action>
<!-- ... -->
<action type="delete">
<old>
  <way id="328897117" version="1" timestamp="2015-02-18T03:36:37Z" changeset="28924003" uid="617220" user="raniedwianugrah">
    <bounds minlat="-7.2014516" minlon="113.2419865" maxlat="-7.2012318" maxlon="113.2423079"/>
    <nd ref="3357186374" lat="-7.2012318" lon="113.2420205"/>
    <nd ref="3357186375" lat="-7.2012922" lon="113.2423079"/>
    <nd ref="3357186382" lat="-7.2014516" lon="113.2422739"/>
    <nd ref="3357186381" lat="-7.2013912" lon="113.2419865"/>
    <nd ref="3357186374" lat="-7.2012318" lon="113.2420205"/>
    <tag k="M:hazard_prone" v="yes"/>
    <tag k="access:roof" v="no"/>
    <tag k="addr:full" v="Jalan Samsul arifin"/>
    <tag k="amenity" v="school"/>
    <tag k="building" v="yes"/>
    <tag k="building:condition" v="good"/>
    <tag k="building:levels" v="1"/>
    <tag k="building:roof" v="tile"/>
    <tag k="building:structure" v="reinforced_masonry"/>
    <tag k="building:walls" v="bata (brick)1"/>
    <tag k="capacity:persons" v="500"/>
    <tag k="name" v="SMKN2 SAMPANG"/>
    <tag k="operator:type" v="goverment"/>
    <tag k="school:type_idn" v="SMK"/>
    <tag k="water_supply" v="pipeline"/>
  </way>
</old>
<new>
  <way id="328897117" visible="false" version="2" timestamp="2018-01-10T02:41:43Z" changeset="55309655" uid="2294556" user="anisa berliana"/>
</new>
</action>
<!-- ... -->
<action type="create">
  <way id="552017489" version="1" timestamp="2018-01-10T02:41:03Z" changeset="55309649" uid="83188" user="dannmer">
    <bounds minlat="39.9990851" minlon="-75.3383729" maxlat="39.9997818" maxlon="-75.3372020"/>
    <nd ref="5329383215" lat="39.9997818" lon="-75.3372020"/>
    <nd ref="5330326034" lat="39.9997692" lon="-75.3374422"/>
    <nd ref="5330326035" lat="39.9997486" lon="-75.3376219"/>
    <nd ref="5330326036" lat="39.9997219" lon="-75.3378284"/>
    <nd ref="5330326037" lat="39.9996870" lon="-75.3380510"/>
    <nd ref="5330326038" lat="39.9996438" lon="-75.3382254"/>
    <nd ref="5330326039" lat="39.9995925" lon="-75.3383112"/>
    <nd ref="5330326040" lat="39.9995185" lon="-75.3383380"/>
    <nd ref="5330326041" lat="39.9994034" lon="-75.3383729"/>
    <nd ref="5330326042" lat="39.9992863" lon="-75.3383568"/>
    <nd ref="5330326043" lat="39.9991692" lon="-75.3383434"/>
    <nd ref="5330326044" lat="39.9990851" lon="-75.3383380"/>
    <tag k="highway" v="path"/>
    <tag k="surface" v="unpaved"/>
  </way>
</action>
<!-- ... -->
<action type="create">
  <relation id="7889640" version="1" timestamp="2018-01-10T02:41:49Z" changeset="55309654" uid="1231505" user="fortchagos">
    <bounds minlat="38.5457446" minlon="-75.7590167" maxlat="38.5461621" maxlon="-75.7586478"/>
    <member type="way" ref="521907495" role="inner">
      <nd lat="38.5460902" lon="-75.7589233"/>
      <nd lat="38.5458915" lon="-75.7588504"/>
      <nd lat="38.5458696" lon="-75.7589480"/>
      <nd lat="38.5458491" lon="-75.7589405"/>
      <nd lat="38.5458519" lon="-75.7589281"/>
      <nd lat="38.5457889" lon="-75.7589050"/>
      <nd lat="38.5458391" lon="-75.7586817"/>
      <nd lat="38.5458603" lon="-75.7586895"/>
      <nd lat="38.5458569" lon="-75.7587045"/>
      <nd lat="38.5461179" lon="-75.7588003"/>
      <nd lat="38.5460902" lon="-75.7589233"/>
    </member>
    <member type="way" ref="552017524" role="outer">
      <nd lat="38.5461621" lon="-75.7587751"/>
      <nd lat="38.5458156" lon="-75.7586478"/>
      <nd lat="38.5458047" lon="-75.7586963"/>
      <nd lat="38.5458008" lon="-75.7586949"/>
      <nd lat="38.5457918" lon="-75.7587349"/>
      <nd lat="38.5457743" lon="-75.7587285"/>
      <nd lat="38.5457505" lon="-75.7588347"/>
      <nd lat="38.5457674" lon="-75.7588410"/>
      <nd lat="38.5457591" lon="-75.7588780"/>
      <nd lat="38.5457636" lon="-75.7588797"/>
      <nd lat="38.5457446" lon="-75.7589640"/>
      <nd lat="38.5458880" lon="-75.7590167"/>
      <nd lat="38.5459136" lon="-75.7589026"/>
      <nd lat="38.5461168" lon="-75.7589772"/>
      <nd lat="38.5461621" lon="-75.7587751"/>
    </member>
    <tag k="highway" v="pedestrian"/>
    <tag k="surface" v="concrete"/>
    <tag k="type" v="multipolygon"/>
  </relation>
</action>
</osm>

Element Ordering

Elements should be ordered such that any references to other elements in the same diff appear after the element itself. In practice, this means 1) nodes, 2) ways, 3) relations referencing only nodes + ways, 4) relations referencing relations that previously appeared, 5) ...

Overpass appears to further order by 1) modify, 2) delete, 3) create (although this isn't strictly followed--see the full augmented diff--timestamps may be taken into account.

Caveats

Overpass aims to answer the question "what changed in the requested time period?" When provided with sequences, they're converted into timestamps (see above) and the minute following is used as the time period being queried.

Due to the way that OSM minutely diffs are created (more context), they won't contain all changes made during the minute they "represent" if database transactions are open. Rather than retroactively updating elements that changed during a time period, the question we should be asking is "what changes did we find out during the requested time period?" (breaking the assumption of a match between <new> timestamp values and the time associated with a sequence).

Another way to think about this is "what elements were affected by a given minutely diff" and genuinely just augmenting that diff (populating referenced elements and adding referred-to ones) rather than trying to be clever with time ranges.

In the existing Overpass implementation, changeset values for referring elements match the changeset in which the referring element was modified. This introduces subtle bugs, e.g. mapbox/osm-adiff-parser#2, when attempting to aggregate changes by changeset (which is another process that requires updating past knowledge because changesets are replicated separately and may remain open (collecting additional changes) for 24 hours after first appearing in the minutely diff replication stream.

GeoJSON

Bonus points: a GeoJSON FeatureCollection containing 1 or 2 Features (re-assembled geometries) representing the old and new versions of modified elements (indirect or direct).

JSON

Newline-delimited JSON representation of OsmChange (action may be able to be omitted, as it can be inferred by the presence of <old> or version / visible attributes, at least for non-referring elements), for easy consumption?

Related Work

@geohacker was previously producing JSON augmented diffs aggregated by changeset. The post goes into a bit more detail about edge cases that make augmented diff aggregation difficult.

osm-replication-streams contains a parser for Overpass augmented diffs that outputs GeoJSON as described above.

@kamicut used osm-replication-streams to build a Lambda function that consumes minutely diffs and outputs them as individual messages on a Kinesis stream.

Incrementally update roll-ups instead of refreshing materialized views

Using materialized views allows us to write the raw aggregations (per-changeset) and roll them up periodically from nothing.

There are a couple substantial downsides from this approach that become more apparent at scale:

  • roll-ups aren't fully up-to-date (materialized views needs to be refreshed, at which point it is almost immediate out-of-date)
  • refreshing materialized views require a full scan of the source table(s) and may take minutes to hours on larger tables (Missing Maps leaderboards use this approach and can take several hours to update)

Refreshing views on a loop isn't going to work out long-term (and already isn't working for MM).

Support recursive relations when reconstructing geometries

Certain types of relations include other relations as members, 3D Buildings, for example (for outlines). In order to reconstruct these, members must already have been reconstructed.

Examples of such relations can be found using this Presto query, which will reveal the type of relation containing relations:

select
  id,
  tags['type'] type,
  filter(members, x -> x.type = 'relation') relation_members
from planet_history
where cardinality(filter(members, x -> x.type = 'relation')) > 0
  and type = 'relation'

This Presto query will show the type (and quantity) of relation members:

SELECT tags['type'], count(*)
FROM planet_history
WHERE type = 'relation'
  AND id IN (
    SELECT ref
    FROM planet_history
    CROSS JOIN unnest(transform(filter(members, x -> x.type = 'relation'), x -> x.ref)) AS t (ref)
    WHERE cardinality(filter(members, x -> x.type = 'relation')) > 0
            AND type = 'relation'
)
GROUP BY tags['type']
ORDER BY count(*) DESC

Generate statistics with Zeppelin and VectorPipe directly from ORC

Per Changeset stats

  • km of roads edited
  • number of road edits
  • number of buildings edited
  • number of POIs edited (amenity)
  • number of waterway edits
  • km of waterways edited
  • extent of edits
  • countries edited in
  • hashtags participated in

Per Hashtag stats

  • number of participants
  • km of roads edited
  • number of buildings edited
  • number of POIs edited (amenity)
  • km of waterways edited

Create an AugmentedDiffSource

...similar to ChangesSource, etc.

AugmentedDiffStreamProcessor currently uses Spark Streaming's textFile source, which polls a directory (or bucket prefix) to populate the stream. There's no way to specify a starting point, so all JSON files that exist in the path will be loaded and processed.

We'd pushed off doing this against the interim JSON augmented diffs, but the downsides of the textFile source are becoming painful.

A side-effect of implementing a first-class source is that Row mangling can be done within the source.

Run footprint MVT updaters and watch for crashes

Also improve the story for restarting processes to avoid data loss / duplication.

Hashtag footprints are the most problematic, as the use of stream-stream joins (changes + changesets) results in a lack of clarity as to which replication sequences have actually been processed into MVT changes.

A likely improvement is to create a dummy layer within the MVTs that contains a list of sequences that have been applied (changeset sequences for hashtags, changes for users) in order to prevent re-applying changes.

Planned usage

Hi Seth,

what's the planned usage of osmesa?

Are than any functions not somehow related to statistics?

What abouting hosting a How do you contribute? tool on steroids?

Best wishes

Create deployment OSMesa with Ingest and Zeppelin notebook capabilities

Develop the necessary terraform scripts to bring up an OSMesa stack for the Ingest and Global Analytics - Zeppelin Notebook components.

  • Bring up an ephemeral, spot-instance based cluster that has HBase on S3 enabled
  • Run all necessary GeoMesa configuration against the cluster (only on brand new clusters; other instances of ephemeral clusters pointing at an existing S3 backend do not need to do this)
  • Run an ORC ingest (could be Makefile driven with aws emr add-steps)
  • Tear down the ingest cluster
  • Bring up an Analytic cluster with HBase on S3, pointed at a pre-existing cluster, in read only mode, with the Zeppelin application enabled
  • Place all necessary JARs onto the master node, running a Zeppelin notebook, and doing simple tests against the data ingested.

Analytic VectorTiles from OSM History

Generate a layer of Analytic VectorTiles with OSM geometries from OSM history in order to produce them with accurate create, modified timestamp metadata.

Generating Features with Metadata:

In OSM data model only Nodes contain spatial data and they may be referred to by Ways to define, lines, boundaries and polygons. Relations may refer to any of the Nodes, Ways or other Relations. Depending on tags information some Relations have geographic meaning (ex: defining multi polygon with its holes) and others may have only semantic/labelling meaning.

When updates to OSM happen new records with updated changeset/version information are introduced but they retain previous ID and other records referring to changed record are not updated.

osm connected component

For instance it is common to change location of one Node in a Way, which is part of Relation representing multi polygon without updating either the Way or the Relation. This change of constituent point needs to be propagated as a change to the geographic feature.

Proposed Approach

The proposed approach to dealing with this problem is to separate the task of grouping all related OSM records (records are related if one references another by its ID field) and the task of converting the the grouped records into geometric feature with some metadata.

Base assumption is that each group will from a directionally connected component that can fit in memory.
The second stage is converting this connected component to a geographic feature with some metadata.
Because the rules of generating features and then their metadata are varied, and ultimately application dependent keeping them separate from the shuffle logic has organizational benefits.

Here are some true things these connected components

  • Multiple records sharing IDs may have multiple versions
  • Multiple records sharing IDs may have different visibility status
  • We will have many points, fewer ways, fewer relations

Note: If the in-memory assumption does not hold we will have to define condition on how or when to duplicate records when they are shared by more than one graph.

Notes

Current version of vectorpipe employs a similar strategy in its osm module but runs into performance problems. It appears that performing the joins in DataFrame, rather than RDD, context addresses these concerns likely because it removes the need for intermediate, per-row, object allocation when performing the joins but rather maintains an Array of primitive values.

As part of this task we should pursued the DataFrame approach and add resulting work to vector pipe package.

Sub tasks:

  • Generate sample VectorTile output
  • Group related OSM records using DataFrames
    • Generalize Seths code
  • Define data structure to navigate the connected component
  • Mapping from connected component to Feature[G, VersionMetadata]
  • Test on OSM History

Check for off-by-ones in Spark Streaming sources

I'm still a bit fuzzy on how Offsets are handled. For the (pending) AugmentedDiffSource, a current sequence of 3094146 results in 3094145.json being fetched.

Similarly, commit() is called with values that I don't expect.

Offset handling in ReplicationStreamMicroBatchReader is likely to blame:

protected var start: Option[SequenceOffset] = options
.get("start_sequence")
.asScala
.map(s => SequenceOffset(s.toInt))
protected var end: Option[SequenceOffset] = options
.get("end_sequence")
.asScala
.map(s => SequenceOffset(s.toInt))
protected def getCurrentOffset: SequenceOffset
override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {
// TODO memoize this, valid for 30s at a time
val currentOffset = getCurrentOffset
this.start = Some(
start.asScala
.map(_.asInstanceOf[SequenceOffset])
.getOrElse {
this.start.getOrElse {
currentOffset - 1
}
})
this.end = Some(
end.asScala
.map(_.asInstanceOf[SequenceOffset])
.getOrElse {
val next = this.end.map(_ + 1).getOrElse {
this.start.get + 1
}
if (currentOffset > next) {
SequenceOffset(math.min(currentOffset.sequence, next.sequence + batchSize))
} else {
next
}
})
}

Test tippecanoe generation of Vector Tiles for user stats heatmap

Tippecanoe generates pyramided vector tiles from newline-delimited GeoJSON, taking into account a lot of factors for how it structures the vector tiles through zoom levels. It would be advantages for us to take advantage of that work. The test here is to:

  • Generate newline-delimited GeoJSONs for each node for commits by a user. Potentially in multipoint features
  • Run tippecanoe over this to generate vector tiles that can then be styled in a heatmap, similar to the vector tile spike that lead to the below HTML prototype (previously I was baking the centriod of changesets, which is incorrect):
<!DOCTYPE html>
<html>
<head>
    <meta charset='utf-8' />
    <title></title>
    <meta name='viewport' content='initial-scale=1,maximum-scale=1,user-scalable=no' />
    <script src='https://api.tiles.mapbox.com/mapbox-gl-js/v0.41.0/mapbox-gl.js'></script>
    <link href='https://api.tiles.mapbox.com/mapbox-gl-js/v0.41.0/mapbox-gl.css' rel='stylesheet' />
    <style>
        body { margin:0; padding:0; }
        #map { position:absolute; top:0; bottom:0; width:100%; }
    </style>
</head>
<body>

<div id='map'></div>
<script>
mapboxgl.accessToken = 'pk.eyJ1IjoibG9zc3lyb2IiLCJhIjoiY2o3a3V2cWFmMmkyeDMybzJtem5xNmIzZyJ9.L1sWznm30l5lih8MAWGQ8A';
var map = new mapboxgl.Map({
    container: 'map',
    style: 'mapbox://styles/mapbox/light-v9',
    zoom: 10,
    center: [-122.447303, 37.753574]
});

 map.on('load', function () {

     //Heatmap layers also work with a vector tile source.
     map.addSource('footprint', {
         type: 'vector',
         tiles: ['https://s3.amazonaws.com/vectortiles/test-vts/peruser/piaco_dk/{z}/{x}/{y}.mvt']
     });

     map.addLayer({
         "id": "footprint-heat",
         "type": "heatmap",
         "source": "footprint",
         "source-layer": "user_footprint",
         "maxzoom": 13,
         "paint": {
             //Increase the heatmap weight based on frequency and property magnitude
             "heatmap-weight": {
                 "property": "mag",
                 "type": "exponential",
                 "stops": [
                     [0, 0],
                     [6, 1]
                 ]
             },
             //Increase the heatmap color weight weight by zoom level
             //heatmap-ntensity is a multiplier on top of heatmap-weight
             "heatmap-intensity": {
                 "stops": [
                     [0, 1],
                     [9, 3]
                 ]
             },
             //Color ramp for heatmap.  Domain is 0 (low) to 1 (high).
             //Begin color ramp at 0-stop with a 0-transparancy color
             //to create a blur-like effect.
             "heatmap-color": {
                 "stops": [
                     [0, "rgba(33,102,172,0)"],
                     [0.2, "rgb(103,169,207)"],
                     [0.4, "rgb(209,229,240)"],
                     [0.6, "rgb(253,219,199)"],
                     [0.8, "rgb(239,138,98)"],
                     [1, "rgb(178,24,43)"]
                 ]
             },
             //Adjust the heatmap radius by zoom level
             "heatmap-radius": {
                 "stops": [
                     [0, 2],
                     [9, 20]
                 ]
             },
             //Transition from heatmap to circle layer by zoom level
             "heatmap-opacity": {
                 "default": 1,
                 "stops": [
                     [7, 1],
                     [9, 0]
                 ]
             },
         }
     }, 'waterway-label');

     map.addLayer({
         "id": "footprint-point",
         "type": "circle",
         "source": "footprint",
         "source-layer": "user_footprint",
         "minzoom": 7,
         "paint": {
             //Size circle raidus by earthquake magnitude and zoom level
             "circle-radius": {
                 "property": "mag",
                 "type": "exponential",
                 "stops": [
                     [{ zoom: 7, value: 1 }, 1],
                     [{ zoom: 7, value: 6 }, 4],
                     [{ zoom: 16, value: 1 }, 5],
                     [{ zoom: 16, value: 6 }, 50],
                 ]
             },
             //Color circle by earthquake magnitude
             "circle-color": {
                 "property": "mag",
                 "type": "exponential",
                 "stops": [
                     [1, "rgba(33,102,172,0)"],
                     [2, "rgb(103,169,207)"],
                     [3, "rgb(209,229,240)"],
                     [4, "rgb(253,219,199)"],
                     [5, "rgb(239,138,98)"],
                     [6, "rgb(178,24,43)"]
                 ]
             },
             "circle-stroke-color": "white",
             "circle-stroke-width": 1,
             //Transition from heatmap to circle layer by zoom level
             "circle-opacity": {
                 "stops": [
                     [7, 0],
                     [8, 1]
                 ]
             }
         }
     }, 'waterway-label');
});
</script>

</body>
</html>

Performance testing query server

When comparing GM/GW performance roughly one year ago, the GeoTrellis team produced some tooling for testing the performance of queries against accumulo and hbase stores. We should reuse some of that effort here to get a sense of query performance in the relatively exotic hbase on top of s3 architecture we're pursuing

Move aggregated stats into a JSON column

Adding a new statistic currently involves modifying the batch aggregator, the streaming aggregator, the database schema, and osmesa-stat-server.

If we move aggregations into a JSON column, we eliminate the need to change the database schema each time (which usually requires doing a backfill). We also gain a path to simplifying osmesa-stat-server to be less sensitive to available statistics (since it can infer what's available from the content of the JSON column).

This also provides a path for moving the augmented_diffs columns (which is used for tracking which sequences have been applied to a changeset aggregate metrics) into the same (or separate) JSON column, making the backfill requirement more granular (just backfill missing data, not everything again).

Read compressed augmented diffs

Augmented diffs are currently loaded as uncompressed JSON from s3://<bucket>/<path>/<sequence>.json.

Gzipped JSON is now available, e.g. s3://osm-detroit/augmented-diffs/003/288/525.json.gz. This reduces the amount of storage these take on S3 and makes them more manageable to list.

AugmentedDiffSource should read these compressed versions instead.

Exclude AWS SDK JARs when creating an assembly

This shrinks the size of the assembly while also improving compatibility with EMR. When there are conflicts, Glue Catalog can't be used. When the AWS JARs are excluded, it works just fine (with provided versions of these libraries).

assemblyExcludedJars in assembly := {
  val cp = (fullClasspath in assembly).value
  cp filter { f =>
    f.data.getName.contains("aws-java-sdk")
  }
}

geotrellis-s3 (as gtS3) includes an exclude() in its dependencies; that may be able to go away using this approach.

(This should be applied to each of the projects here and probably ported to VP as well.)

Restructure osmesa repository

This will include moving the oneoffs to a separate repo, probably osmesa-apps and cleaning up the analytics project.

It would be nice to address #164 as part of this.

Review Architecture and repository setup

@azavea/operations

This task is for getting operation's feedback on the the current architecture direction and repository setup. This will include a meeting to get an overview and review of architecture documentation.

Thinking in changesets

I had a conversation with @fosskers about the /users/uid route and we were talking about the underlying mechanism for generating the statistics. The original thinking was that there would be underlying users and campaigns tables in HBase that aggregated statistics such as "road count by user", "building count by user" at ingest time.

I want to make the case that the underlying table should be changesets and that users and hashtags are materialized views of grouping queries. (This was first brought up by @mojodna)

  1. Changesets are the primitive datatype above osm entities (node, way, relationships). A changesets has-a user and has-a hashtag.
  2. By aggregating without keeping changeset timestamp information we lose some valuable granular information. For example, we wouldn't know "when" a user has passed a certain threshold to award them a badge.

While I don't think osmesa should handle or be aware of user-end terms like "badges", it should allow downstream apps to process that granular information. By flipping the model of the Query API to "grab changesets filtered by user/campaign", we can create these use cases downstream.

OSMesa Dataset for OSM records

Currently OSMesa has streaming data sources that read augmented diffs and changsets:

osmesa.common.streaming.AugmentedDiffProvider
osmesa.common.streaming.ChangeProvider
osmesa.common.streaming.ChangesetProvider

The logic backing these can be used to implement OSMesa DataSource that will consume the same files given start and end date and produce a Dataset[OSM] DStream[OSM]

This is very useful for debugging, working in Zeppelin notebooks and creating backfill batch jobs.

Enables: #48

Accompany augmented diff JSON w/ state.yaml

azavea/augdiff-pipeline#17 contains a Node app that converts Overpass augmented diffs into GeoJSON (with before/after geometries). Sequences are written to S3. Empty sequences are omitted, so the current way to retrieve the "latest" available sequence is to list the bucket/prefix (which gets slower as more data is added).

state.yaml should be created/updated after each sequence is written (or omitting) so that consumers can poll a single object to determine the current, definitive state of the stream.

Sample Analytic Vector Tile output

Connects: #28

We should generate sample vector tile output to get early integrations testing and iteration going.

Use: s3://osm-pds-tmp/ that were generated by Seth.

Expected Output:

  • Vector tiles at zoom level 12
  • All features go into a single vector tile layer named all
  • Feature tags contains created, modified, version, and type fields.
  • Features contain, highways, waterways, buildings

Some geometries may not represented in the layer and modified tag may need to be mocked. I don't believe it can be correctly generated with current vectorpipe code

osm2pgsql / imposm3-style layer curation

We've been looking at vector tile generation for analytical purposes. Longer-term, we should consider this pipeline for stylized rendering purposes (tiler-perry FTW), which would involve layer configurations or mapping functions that go from geometry type/tags/zoom to processed geometry/tags/layer.

Address the deficit of documentation

The documentation is sparse right now despite the fact that we can already do the things promised. We should lay out some simple instructions to deploy and use the code

Compile Exception for Generated File When Saving Geometries

I was trying to run the ExtractMultiPolygons script using the most current master, and I get this error whenever I try to write the geometries:

2018-05-30 08:23:33 ERROR CodeGenerator:91 - failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 91, Column 0: Cannot compare types "java.lang.Object" and "long"
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 91, Column 0: Cannot compare types "java.lang.Object" and "long"
	at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)
	at org.codehaus.janino.UnitCompiler.compileBoolean2(UnitCompiler.java:3913)
	at org.codehaus.janino.UnitCompiler.access$5800(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$10.visitBinaryOperation(UnitCompiler.java:3636)
	at org.codehaus.janino.UnitCompiler$10.visitBinaryOperation(UnitCompiler.java:3614)
	at org.codehaus.janino.Java$BinaryOperation.accept(Java.java:4693)
	at org.codehaus.janino.UnitCompiler.compileBoolean(UnitCompiler.java:3614)
	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4122)
	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4679)
	at org.codehaus.janino.UnitCompiler.access$7700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$12.visitBinaryOperation(UnitCompiler.java:4091)
	at org.codehaus.janino.UnitCompiler$12.visitBinaryOperation(UnitCompiler.java:4070)
	at org.codehaus.janino.Java$BinaryOperation.accept(Java.java:4693)
	at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4070)
	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5253)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3477)
	at org.codehaus.janino.UnitCompiler.access$5300(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3439)
	at org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3419)
	at org.codehaus.janino.Java$Assignment.accept(Java.java:4306)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3419)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2339)
	at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1473)
	at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2851)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$Block.accept(Java.java:2756)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2455)
	at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1474)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$IfStatement.accept(Java.java:2926)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$Block.accept(Java.java:2756)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.fakeCompile(UnitCompiler.java:1508)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2413)
	at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1474)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$IfStatement.accept(Java.java:2926)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$Block.accept(Java.java:2756)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2465)
	at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1474)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$IfStatement.accept(Java.java:2926)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$Block.accept(Java.java:2756)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1571)
	at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitDoStatement(UnitCompiler.java:1481)
	at org.codehaus.janino.UnitCompiler$6.visitDoStatement(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3075)
	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
	at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
	at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
	at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385)
	at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1285)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:825)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:411)
	at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:390)
	at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:385)
	at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1405)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385)
	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:357)
	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
	at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
	at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1421)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1497)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1494)
	at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
	at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
	at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
	at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
	at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
	at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
	at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1369)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:579)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:578)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.AppendColumnsExec.doExecute(objects.scala:261)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.MapGroupsExec.doExecute(objects.scala:329)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.inputRDDs(BroadcastHashJoinExec.scala:76)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.UnionExec$$anonfun$doExecute$1.apply(basicPhysicalOperators.scala:557)
	at org.apache.spark.sql.execution.UnionExec$$anonfun$doExecute$1.apply(basicPhysicalOperators.scala:557)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:557)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
	at org.apache.spark.sql.DataFrameWriter.orc(DataFrameWriter.scala:570)
	at osmesa.ExtractMultiPolygons$$anonfun$$lessinit$greater$1.apply(ExtractMultiPolygons.scala:116)
	at osmesa.ExtractMultiPolygons$$anonfun$$lessinit$greater$1.apply(ExtractMultiPolygons.scala:39)
	at cats.SemigroupalArityFunctions$$anonfun$map3$1.apply(SemigroupalArityFunctions.scala:15)
	at cats.SemigroupalArityFunctions$$anonfun$map3$1.apply(SemigroupalArityFunctions.scala:15)
	at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
	at cats.data.Validated.ap(Validated.scala:172)
	at cats.data.ValidatedApplicative.ap(Validated.scala:509)
	at cats.data.ValidatedApplicative.ap(Validated.scala:502)
	at cats.ComposedApply$$anonfun$ap$1$$anonfun$apply$1.apply(Composed.scala:25)
	at cats.Monad$$anonfun$map$1.apply(Monad.scala:16)
	at cats.instances.Function0Instances$$anon$1$$anonfun$flatMap$1.apply(function.scala:25)
	at cats.instances.Function0Instances$$anon$1$$anonfun$flatMap$1.apply(function.scala:25)
	at cats.instances.Function0Instances$$anon$1$$anonfun$flatMap$1.apply(function.scala:25)
	at com.monovore.decline.Parser.com$monovore$decline$Parser$$evalResult(Parser.scala:26)
	at com.monovore.decline.Parser.consumeAll(Parser.scala:91)
	at com.monovore.decline.Parser.apply(Parser.scala:17)
	at com.monovore.decline.Command.parse(opts.scala:17)
	at com.monovore.decline.CommandApp.main(CommandApp.scala:48)
	at osmesa.ExtractMultiPolygons.main(ExtractMultiPolygons.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2018-05-30 08:23:34 WARN  WholeStageCodegenExec:66 - Whole-stage codegen disabled for plan (id=12):
 *(12) Project [type#529, changeset#314L, id#469L, version#488, minorVersion#511, updated#490, validUntil#503, role#531, geom#318]
+- *(12) Filter (((isnull(memberUpdated#545) && isnull(memberValidUntil#546)) && isnull(geom#318)) || ((memberUpdated#545 <= updated#490) && (updated#490 < coalesce(memberValidUntil#546, 1527682999315000))))
   +- *(12) HashAggregate(keys=[role#531, validUntil#503, ref#530L, version#488, id#469L, updated#490, minorVersion#511, changeset#314L, type#529], functions=[], output=[changeset#314L, id#469L, version#488, minorVersion#511, updated#490, validUntil#503, type#529, role#531, memberUpdated#545, memberValidUntil#546, geom#318])
      +- *(12) HashAggregate(keys=[role#531, validUntil#503, ref#530L, version#488, id#469L, updated#490, minorVersion#511, changeset#314L, type#529], functions=[], output=[role#531, validUntil#503, ref#530L, version#488, id#469L, updated#490, minorVersion#511, changeset#314L, type#529])
         +- *(12) Project [changeset#314L, id#469L, version#488, minorVersion#511, updated#490, validUntil#503, member#521.type AS type#529, member#521.ref AS ref#530L, member#521.role AS role#531]
            +- *(12) Filter member#521.role IN (,outer,inner)
               +- Generate explode(members#49), [id#469L, version#488, changeset#314L, updated#490, validUntil#503, minorVersion#511], true, [member#521]
                  +- *(11) Project [id#469L, version#488, changeset#314L, updated#490, members#49, validUntil#503, (_we0#512 - 1) AS minorVersion#511]
                     +- Window [row_number() windowspecdefinition(id#469L, version#488, updated#490 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#512], [id#469L, version#488], [updated#490 ASC NULLS FIRST]
                        +- *(10) Sort [id#469L ASC NULLS FIRST, version#488 ASC NULLS FIRST, updated#490 ASC NULLS FIRST], false, 0
                           +- Window [lead(updated#490, 1, null) windowspecdefinition(id#469L, updated#490 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS validUntil#503], [id#469L], [updated#490 ASC NULLS FIRST]
                              +- *(9) Sort [id#469L ASC NULLS FIRST, updated#490 ASC NULLS FIRST], false, 0
                                 +- Exchange hashpartitioning(id#469L, 200)
                                    +- *(8) Project [id#469L, version#488, changeset#314L, updated#490, members#49]
                                       +- *(8) BroadcastHashJoin [id#469L, cast(version#488 as bigint)], [id#0L, version#11L], Inner, BuildLeft
                                          :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, true], cast(input[2, int, false] as bigint)))
                                          :  +- *(5) Filter isnotnull(version#488)
                                          :     +- *(5) HashAggregate(keys=[changeset#314L, id#469L], functions=[max(version#11L), max(updated#317)], output=[changeset#314L, id#469L, version#488, updated#490])
                                          :        +- Exchange hashpartitioning(changeset#314L, id#469L, 200)
                                          :           +- *(4) HashAggregate(keys=[changeset#314L, id#469L], functions=[partial_max(version#11L), partial_max(updated#317)], output=[changeset#314L, id#469L, max#944L, max#945])
                                          :              +- Union
                                          :                 :- LocalTableScan <empty>, [changeset#314L, id#469L, version#11L, updated#317]
                                          :                 +- *(3) Project [changeset#7L, id#0L, version#11L, timestamp#8 AS updated#474]
                                          :                    +- *(3) Filter UDF(CASE WHEN (NOT visible#12 && isnotnull(_we0#51)) THEN _we1#52 ELSE tags#2 END)
                                          :                       +- Window [lag(tags#2, 1, null) windowspecdefinition(id#0L, version#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#51, lag(tags#2, 1, null) windowspecdefinition(id#0L, version#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we1#52], [id#0L], [version#11L ASC NULLS FIRST]
                                          :                          +- *(2) Sort [id#0L ASC NULLS FIRST, version#11L ASC NULLS FIRST], false, 0
                                          :                             +- *(2) Project [id#0L, changeset#7L, timestamp#8, version#11L, visible#12, tags#2]
                                          :                                +- Exchange hashpartitioning(id#0L, 200)
                                          :                                   +- *(1) Project [id#0L, tags#2, changeset#7L, timestamp#8, version#11L, visible#12]
                                          :                                      +- *(1) Filter ((isnotnull(type#1) && (type#1 = relation)) && isnotnull(id#0L))
                                          :                                         +- *(1) FileScan orc [id#0L,type#1,tags#2,changeset#7L,timestamp#8,version#11L,visible#12] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/tmp/rhode-island.orc], PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,relation), IsNotNull(id)], ReadSchema: struct<id:bigint,type:string,tags:map<string,string>,changeset:bigint,timestamp:timestamp,version...
                                          +- *(8) Project [id#0L, version#11L, members#49]
                                             +- *(8) Filter (UDF(CASE WHEN (NOT visible#12 && isnotnull(_we0#51)) THEN _we1#52 ELSE tags#2 END) && isnotnull(version#11L))
                                                +- Window [lag(tags#2, 1, null) windowspecdefinition(id#0L, version#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#51, lag(tags#2, 1, null) windowspecdefinition(id#0L, version#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we1#52], [id#0L], [version#11L ASC NULLS FIRST]
                                                   +- *(7) Sort [id#0L ASC NULLS FIRST, version#11L ASC NULLS FIRST], false, 0
                                                      +- *(7) Project [id#0L, UDF(members#6) AS members#49, version#11L, visible#12, tags#2]
                                                         +- Exchange hashpartitioning(id#0L, 200)
                                                            +- *(6) Project [id#0L, tags#2, members#6, version#11L, visible#12]
                                                               +- *(6) Filter ((isnotnull(type#1) && (type#1 = relation)) && isnotnull(id#0L))
                                                                  +- *(6) FileScan orc [id#0L,type#1,tags#2,members#6,version#11L,visible#12] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/tmp/rhode-island.orc], PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,relation), IsNotNull(id)], ReadSchema: struct<id:bigint,type:string,tags:map<string,string>,members:array<struct<type:string,ref:bigint,...

2018-05-30 08:23:42 ERROR CodeGenerator:91 - failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 83, Column 0: Cannot compare types "java.lang.Object" and "long"
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 83, Column 0: Cannot compare types "java.lang.Object" and "long"
	at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)
	at org.codehaus.janino.UnitCompiler.compileBoolean2(UnitCompiler.java:3913)
	at org.codehaus.janino.UnitCompiler.access$5800(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$10.visitBinaryOperation(UnitCompiler.java:3636)
	at org.codehaus.janino.UnitCompiler$10.visitBinaryOperation(UnitCompiler.java:3614)
	at org.codehaus.janino.Java$BinaryOperation.accept(Java.java:4693)
	at org.codehaus.janino.UnitCompiler.compileBoolean(UnitCompiler.java:3614)
	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4122)
	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4679)
	at org.codehaus.janino.UnitCompiler.access$7700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$12.visitBinaryOperation(UnitCompiler.java:4091)
	at org.codehaus.janino.UnitCompiler$12.visitBinaryOperation(UnitCompiler.java:4070)
	at org.codehaus.janino.Java$BinaryOperation.accept(Java.java:4693)
	at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4070)
	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5253)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3477)
	at org.codehaus.janino.UnitCompiler.access$5300(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3439)
	at org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3419)
	at org.codehaus.janino.Java$Assignment.accept(Java.java:4306)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3419)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2339)
	at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1473)
	at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2851)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$Block.accept(Java.java:2756)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2455)
	at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1474)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$IfStatement.accept(Java.java:2926)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$Block.accept(Java.java:2756)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.fakeCompile(UnitCompiler.java:1508)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2413)
	at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1474)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$IfStatement.accept(Java.java:2926)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$Block.accept(Java.java:2756)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2465)
	at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1474)
	at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$IfStatement.accept(Java.java:2926)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
	at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
	at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$Block.accept(Java.java:2756)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1571)
	at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$6.visitDoStatement(UnitCompiler.java:1481)
	at org.codehaus.janino.UnitCompiler$6.visitDoStatement(UnitCompiler.java:1466)
	at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3075)
	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
	at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
	at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
	at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385)
	at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1285)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:825)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:411)
	at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:212)
	at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:390)
	at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:385)
	at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1405)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385)
	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:357)
	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
	at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
	at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1421)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1497)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1494)
	at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
	at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
	at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
	at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
	at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
	at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
	at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1369)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:579)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:578)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.AppendColumnsExec.doExecute(objects.scala:261)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.MapGroupsExec.doExecute(objects.scala:329)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.inputRDDs(BroadcastHashJoinExec.scala:76)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.UnionExec$$anonfun$doExecute$1.apply(basicPhysicalOperators.scala:557)
	at org.apache.spark.sql.execution.UnionExec$$anonfun$doExecute$1.apply(basicPhysicalOperators.scala:557)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:557)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
	at org.apache.spark.sql.DataFrameWriter.orc(DataFrameWriter.scala:570)
	at osmesa.ExtractMultiPolygons$$anonfun$$lessinit$greater$1.apply(ExtractMultiPolygons.scala:116)
	at osmesa.ExtractMultiPolygons$$anonfun$$lessinit$greater$1.apply(ExtractMultiPolygons.scala:39)
	at cats.SemigroupalArityFunctions$$anonfun$map3$1.apply(SemigroupalArityFunctions.scala:15)
	at cats.SemigroupalArityFunctions$$anonfun$map3$1.apply(SemigroupalArityFunctions.scala:15)
	at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
	at cats.data.Validated.ap(Validated.scala:172)
	at cats.data.ValidatedApplicative.ap(Validated.scala:509)
	at cats.data.ValidatedApplicative.ap(Validated.scala:502)
	at cats.ComposedApply$$anonfun$ap$1$$anonfun$apply$1.apply(Composed.scala:25)
	at cats.Monad$$anonfun$map$1.apply(Monad.scala:16)
	at cats.instances.Function0Instances$$anon$1$$anonfun$flatMap$1.apply(function.scala:25)
	at cats.instances.Function0Instances$$anon$1$$anonfun$flatMap$1.apply(function.scala:25)
	at cats.instances.Function0Instances$$anon$1$$anonfun$flatMap$1.apply(function.scala:25)
	at com.monovore.decline.Parser.com$monovore$decline$Parser$$evalResult(Parser.scala:26)
	at com.monovore.decline.Parser.consumeAll(Parser.scala:91)
	at com.monovore.decline.Parser.apply(Parser.scala:17)
	at com.monovore.decline.Command.parse(opts.scala:17)
	at com.monovore.decline.CommandApp.main(CommandApp.scala:48)
	at osmesa.ExtractMultiPolygons.main(ExtractMultiPolygons.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2018-05-30 08:23:42 WARN  WholeStageCodegenExec:66 - Whole-stage codegen disabled for plan (id=29):
 *(29) Project [type#783, changeset#314L, id#723L, version#742, minorVersion#765, updated#744, validUntil#757, role#785, geom#318]
+- *(29) Filter (((isnull(memberUpdated#798) && isnull(memberValidUntil#799)) && isnull(geom#318)) || ((memberUpdated#798 <= updated#744) && (updated#744 < coalesce(memberValidUntil#799, 1527682999315000))))
   +- *(29) HashAggregate(keys=[role#785, validUntil#757, ref#784L, version#742, id#723L, updated#744, minorVersion#765, changeset#314L, type#783], functions=[], output=[changeset#314L, id#723L, version#742, minorVersion#765, updated#744, validUntil#757, type#783, role#785, memberUpdated#798, memberValidUntil#799, geom#318])
      +- *(29) HashAggregate(keys=[role#785, validUntil#757, ref#784L, version#742, id#723L, updated#744, minorVersion#765, changeset#314L, type#783], functions=[], output=[role#785, validUntil#757, ref#784L, version#742, id#723L, updated#744, minorVersion#765, changeset#314L, type#783])
         +- *(29) Project [changeset#314L, id#723L, version#742, minorVersion#765, updated#744, validUntil#757, member#775.type AS type#783, member#775.ref AS ref#784L, member#775.role AS role#785]
            +- Generate explode(members#49), [id#723L, version#742, changeset#314L, updated#744, validUntil#757, minorVersion#765], true, [member#775]
               +- *(28) Project [id#723L, version#742, changeset#314L, updated#744, members#49, validUntil#757, (_we0#766 - 1) AS minorVersion#765]
                  +- Window [row_number() windowspecdefinition(id#723L, version#742, updated#744 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#766], [id#723L, version#742], [updated#744 ASC NULLS FIRST]
                     +- *(27) Sort [id#723L ASC NULLS FIRST, version#742 ASC NULLS FIRST, updated#744 ASC NULLS FIRST], false, 0
                        +- Window [lead(updated#744, 1, null) windowspecdefinition(id#723L, updated#744 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS validUntil#757], [id#723L], [updated#744 ASC NULLS FIRST]
                           +- *(26) Sort [id#723L ASC NULLS FIRST, updated#744 ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(id#723L, 200)
                                 +- *(25) Project [id#723L, version#742, changeset#314L, updated#744, members#49]
                                    +- *(25) BroadcastHashJoin [id#723L, cast(version#742 as bigint)], [id#0L, version#11L], Inner, BuildLeft
                                       :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, true], cast(input[2, int, false] as bigint)))
                                       :  +- *(22) Filter isnotnull(version#742)
                                       :     +- *(22) HashAggregate(keys=[changeset#314L, id#723L], functions=[max(version#11L), max(updated#317)], output=[changeset#314L, id#723L, version#742, updated#744])
                                       :        +- Exchange hashpartitioning(changeset#314L, id#723L, 200)
                                       :           +- *(21) HashAggregate(keys=[changeset#314L, id#723L], functions=[partial_max(version#11L), partial_max(updated#317)], output=[changeset#314L, id#723L, max#948L, max#949])
                                       :              +- Union
                                       :                 :- LocalTableScan <empty>, [changeset#314L, id#723L, version#11L, updated#317]
                                       :                 +- *(20) Project [changeset#7L, id#0L, version#11L, timestamp#8 AS updated#728]
                                       :                    +- *(20) Filter (UDF(CASE WHEN (NOT visible#12 && isnotnull(_we0#51)) THEN _we1#52 ELSE tags#2 END) && UDF(CASE WHEN (NOT visible#12 && isnotnull(_we0#51)) THEN _we1#52 ELSE tags#2 END))
                                       :                       +- Window [lag(tags#2, 1, null) windowspecdefinition(id#0L, version#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#51, lag(tags#2, 1, null) windowspecdefinition(id#0L, version#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we1#52], [id#0L], [version#11L ASC NULLS FIRST]
                                       :                          +- *(19) Sort [id#0L ASC NULLS FIRST, version#11L ASC NULLS FIRST], false, 0
                                       :                             +- *(19) Project [id#0L, changeset#7L, timestamp#8, version#11L, visible#12, tags#2]
                                       :                                +- ReusedExchange [id#0L, tags#2, changeset#7L, timestamp#8, version#11L, visible#12], Exchange hashpartitioning(id#0L, 200)
                                       +- *(25) Project [id#0L, version#11L, members#49]
                                          +- *(25) Filter ((UDF(CASE WHEN (NOT visible#12 && isnotnull(_we0#51)) THEN _we1#52 ELSE tags#2 END) && UDF(CASE WHEN (NOT visible#12 && isnotnull(_we0#51)) THEN _we1#52 ELSE tags#2 END)) && isnotnull(version#11L))
                                             +- Window [lag(tags#2, 1, null) windowspecdefinition(id#0L, version#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#51, lag(tags#2, 1, null) windowspecdefinition(id#0L, version#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we1#52], [id#0L], [version#11L ASC NULLS FIRST]
                                                +- *(24) Sort [id#0L ASC NULLS FIRST, version#11L ASC NULLS FIRST], false, 0
                                                   +- *(24) Project [id#0L, UDF(members#6) AS members#49, version#11L, visible#12, tags#2]
                                                      +- ReusedExchange [id#0L, tags#2, members#6, version#11L, visible#12], Exchange hashpartitioning(id#0L, 200)

Done.

S3 tile writer

Currently implemented within DataFrame.foreach { ... }, but would potentially be useful as a sink, accepting some data structure that includes the target URI, bytes to write, whether to gzip, the content-type, and possibly other metadata.

See also #93

Histogram tiles are untenable at scale

As a consequence of #150, it is now known that the edit histogram tile approach doesn't scale well. After running on OSM, many large tiles were outputted. These are multimegabyte tiles, where the guidelines (referred to here, link to mapbox docs is broken) suggest (and possibly enforce) a 500kb file size limit. This may be leading to apparently broken demos where tiles exist, but are not being displayed.

It might be time for a ground-floor rethink of the representation of these histograms to allow for more parsimonious tiles.

Streams can't be resumed from checkpoints

When checkpointLocation is set, resuming execution fails with:

osmesa.common.streaming.AugmentedDiffMicroBatchReader@59e19287
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
	... 1 more
ss.readStream
  .format("augmented-diffs")
  .load
  .writeStream
  .option("checkpointLocation", "/tmp/augmented-diff-checkpoint")
  .format("console")
  .start

This appears to be a Spark bug with v2 streaming data sources. See SPARK-25257.

Tool to apply diffs to an ORC file to update it

As discussed in #25, we should create a tool that checks the replication sequence number from the ORC file (which is a TBD; it should be in the OSM PBF metadata but isn't currently copied to the ORC user metadata by osm2orc), fetches all OsmChange diffs (ideally minutely to capture incremental changes) from planet.osm.org (or an S3 mirror, also TBD), and applies them to the ORC file as additional rows, writing the result back out to S3 (probably partitioned, probably unsorted since that makes the task faster and the output is not intended to be downloaded / used for purposes that assume sorting).

Upgrade to VectorPipe 2.0

This will require an upgrade to GT 3.1 and Spark 2.4+, so maybe not entirely trivial. We'll want to test this on staging and ensure our ECS tasks continue to function properly.

Write tiles as Tapalcatl 2 archives: zipped pyramids of vector tiles on S3

(I'm paraphrasing a bit here, because I may be interpreting tapalcatl's intent slightly differently.) Tapalcatl (Python) facilitate storage of multiple formats + tiles within a single meta-tile. The meta-tile is a zip file. (This is effectively the generalized tile equivalent of a Cloud Optimized GeoTIFF.)

I propose grouping pyramids of generated vector tiles (initially 8 zoom levels, but that may be too many depending on the size of the zip file index and file itself) and zipping them together prior to writing to S3. This will dramatically reduce the number of objects written to S3 (reducing latency and cost) while facilitating improved caching of data.

Under this proposal, if /4/5/6.mvt (the target tile) were requested, /0/0/0.zip (the meta tile) would be fetched and /4/5/6.mvt extracted from it.

Zip files include a directory w/ entry offsets, so it's possible to do partial reads of the zip file (3 requests: 2 for the directory (which can be cached), one for the entry itself (which can be cached as part of larger block reads, e.g. 10MB aligned)) (see https://github.com/mojodna/tilelive-tapalcatl for a really preliminary implementation of this).

Reader support for tapalcatl meta tiles could be implemented in client code, via a proxy server (tapalcatl-py) that allows individual tile requests, or as a Service Worker that can intercept tile requests + cache blocks.

Relation geometry construction

Preliminary relation types to target (comparing with osm2pgsql output for the same relation versions):

  • type=multipolygon - output: Polygon / MultiPolygon
  • type=boundary - output: Polygon / MultiPolygon
  • type=route - output: MultiLineString / GeometryCollection (for the route itself) + GeometryCollection (for stops) + GeometryCollection (for platforms)

Generate development environment stack

This will culminate in a docker-compose.yml and supporting scripts to be able to bring up the following components:

  • HDFS
  • ZooKeeper
  • HBase with GeoMesa extensions
  • GeoServer
  • Zeppelin

Testing this setup should include some version of an ingest and view through GeoServer that matches http://www.geomesa.org/documentation/tutorials/geomesa-quickstart-hbase.html, as well as accessing GeoMesa through SparkSQL in a Zeppelin notebook.

References:
https://github.com/geodocker/geodocker-geomesa
https://github.com/ccri/cloud-local
https://github.com/geotrellis/geodocker-cluster/tree/daunnc/hbase/hbase

Minutely updates, augmented diffs, and queries

Earlier this week, @lossyrob @moradology @kamicut and I got together in Philly to talk about taking OSMesa forward and kicked around some ideas. I just want to drop notes here, and invite comments from anyone who's interested.

Recap of what we have now

  1. Weekly updated full history ORC files hosted on AWS.
  2. Collection of scripts that runs on Apache Spark that runs periodic analysis jobs.

Where we want to get to

  1. Minutely updates.
  2. Augmented diffs for storing minutely changes.
  3. Streaming minutely augmented diffs.
  4. Infrastructure for arbitrary queries using tags and bboxes.
  5. Infrastructure for periodic analytics jobs.

To be clear, we're not proposing above as the only future of OSMesa - rather thinking about different parts we need to build for making this work be useful for the larger OSM community. We’re using the repo to anchor this discussion for now, and will eventually fork into others when needed.

ORC to augmented diffs

  • We can use the weekly ORC files to seed history parsing much faster than parsing the planet pbf as horizontally scalable Spark jobs.
  • OSMesa already has utilities that can do parts of this.
  • This can be pushed down a stream for consumers.

Nodecache for lookup

  • Building full and intermediate version of geometries is important as part of the augmented diffs. This means we’ll have to store all the current (or perhaps all previous versions) of all nodes in OSM.
  • There are some experiments previously using DynamoDB and RocksDB - but collectively we think DynamoDB might be better because it’s managed and hopefully won’t get too expensive.
  • The datastore should be optimised for faster look up. We’ll store node versions, and membership but no geometries.
  • We’ll persist the nodecache on S3 for recovery against a failure.
  • The nodecache potentially only need to contain current version of all nodes.

Minutely processing

  • For every minutely change, we look up the nodecache to build the augmented diff, and write this to S3 as well into the stream.

Analytics/Query Data Store

Batch jobs

  • We’ll build a process that listens to augmented diffs stream and incrementally stores the data for running period analytics. The store will be optimized for long running batch jobs, rather than speed of query response.

Query server

  • We probably want to think of spatio-temporal queries as a separate use case because the more predictable the queries are, the more control over indices we have especially for hbase like data stores. We discussed GeoMesa as the underlying store here with some sort of frontend server for API ergonomics.

2018-01-29 16 02 18

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.