Code Monkey home page Code Monkey logo

spark-bigquery's Introduction

spark-bigquery

This Spark module allows saving DataFrame as BigQuery table.

The project was inspired by spotify/spark-bigquery, but there are several differences and enhancements:

  • Use of the Structured Streaming API

  • Use within Pyspark

  • Saving via Decorators

  • Allow saving to partitioned tables

  • Easy integration with Databricks

  • Use of Standard SQL

  • Use Of Time-Ingested Partition Columns

  • Run Data Manipulation Language Queries DML

  • Update schemas on writes using the setSchemaUpdateOptions

  • JSON is used as an intermediate format instead of Avro. This allows having fields on different levels named the same:

{
  "obj": {
    "data": {
      "data": {}
    }
  }
}
  • DataFrame's schema is automatically adapted to a legal one:

    1. Illegal characters are replaced with _
    2. Field names are converted to lower case to avoid ambiguity
    3. Duplicate field names are given a numeric suffix (_1, _2, etc.)

Docker!

I created a container that launches zepplin with spark and the connector for ease of use and quick startup. You can find it here

Usage

Including spark-bigquery into your project

Maven

<repositories>
  <repository>
    <id>oss-sonatype</id>
    <name>oss-sonatype</name>
    <url>https://oss.sonatype.org/content/repositories/releases/</url>
    <snapshots>
      <enabled>true</enabled>
    </snapshots>
  </repository>
</repositories>

<dependencies>
  <dependency>
    <groupId>com.github.samelamin</groupId>
    <artifactId>spark-bigquery_${scala.binary.version}</artifactId>
    <version>0.2.6</version>
  </dependency>
</dependencies>

SBT

To use it in a local SBT console first add the package as a dependency then set up your project details

resolvers += Opts.resolver.sonatypeReleases

libraryDependencies += "com.github.samelamin" %% "spark-bigquery" % "0.2.6"
import com.samelamin.spark.bigquery._

// Set up GCP credentials
sqlContext.setGcpJsonKeyFile("<JSON_KEY_FILE>")

// Set up BigQuery project and bucket
sqlContext.setBigQueryProjectId("<BILLING_PROJECT>")
sqlContext.setBigQueryGcsBucket("<GCS_BUCKET>")

// Set up BigQuery dataset location, default is US
sqlContext.setBigQueryDatasetLocation("<DATASET_LOCATION>")

Structured Streaming from S3/HDFS to BigQuery

S3,Blob Storage or HDFS are the defacto technology for storage in the cloud, this package allows you to stream any data added to a Big Query Table of your choice

import com.samelamin.spark.bigquery._

val df = spark.readStream.json("s3a://bucket")

df.writeStream
      .option("checkpointLocation", "s3a://checkpoint/dir")
      .option("tableReferenceSink","my-project:my_dataset.my_table")
      .format("com.samelamin.spark.bigquery")
      .start()

Structured Streaming from BigQuery Table

You can use this connector to stream from a BigQuery Table. The connector uses a Timestamped column to get offsets.

import com.samelamin.spark.bigquery._

val df = spark
          .readStream
          .option("tableReferenceSource","my-project:my_dataset.my_table")
          .format("com.samelamin.spark.bigquery")
          .load()

You can also specify a custom timestamp column:

import com.samelamin.spark.bigquery._

sqlContext.setBQTableTimestampColumn("column_name")

You can also specify a custom Time Ingested Partition column:

import com.samelamin.spark.bigquery._

sqlContext.setBQTimePartitioningField("column_name")

Saving DataFrame using BigQuery Hadoop writer API

By Default any table created by this connector has a timestamp column of bq_load_timestamp which has the value of the current timestamp.

import com.samelamin.spark.bigquery._

val df = ...
df.saveAsBigQueryTable("project-id:dataset-id.table-name")

You can also save to a table decorator by saving to dataset-id.table-name$YYYYMMDD

Saving DataFrame using Pyspark

from pyspark.sql import SparkSession

BQ_PROJECT_ID = "projectId"
DATASET_ID = "datasetId"
TABLE_NAME = "tableName"

KEY_FILE = "/path/to/service_account.json" # When not on GCP
STAGING_BUCKET = "gcs-bucket"              # Intermediate JSON files
DATASET_LOCATION = "US"                    # Location for dataset creation

# Start session and reference the JVM package via py4j for convienence
session = SparkSession.builder.getOrCreate()
bigquery = session._sc._jvm.com.samelamin.spark.bigquery

# Prepare the bigquery context
bq = bigquery.BigQuerySQLContext(session._wrapped._jsqlContext)
bq.setGcpJsonKeyFile(KEY_FILE)
bq.setBigQueryProjectId(BQ_PROJECT_ID)
bq.setGSProjectId(BQ_PROJECT_ID)
bq.setBigQueryGcsBucket(STAGING_BUCKET)
bq.setBigQueryDatasetLocation(DATASET_LOCATION)

# Extract and Transform a dataframe
# df = session.read.csv(...)

# Load into a table or table partition
bqDF = bigquery.BigQueryDataFrame(df._jdf)
bqDF.saveAsBigQueryTable(
    "{0}:{1}.{2}".format(BQ_PROJECT_ID, DATASET_ID, TABLE_NAME),
    False, # Day paritioned when created
    0,     # Partition expired when created
    bigquery.__getattr__("package$WriteDisposition$").__getattr__("MODULE$").WRITE_EMPTY(),
    bigquery.__getattr__("package$CreateDisposition$").__getattr__("MODULE$").CREATE_IF_NEEDED(),
)

Submit with:

pyspark yourjob.py --packages com.github.samelamin:spark-bigquery_2.11:0.2.6

Or

gcloud dataproc jobs submit pyspark yourjob.py --properties spark.jars.packages=com.github.samelamin:spark-bigquery_2.11:0.2.6

Reading DataFrame From BigQuery

import com.samelamin.spark.bigquery._
val sqlContext = spark.sqlContext

sqlContext.setBigQueryGcsBucket("bucketname")
sqlContext.setBigQueryProjectId("projectid")
sqlContext.setGcpJsonKeyFile("keyfilepath")
sqlContext.hadoopConf.set("fs.gs.project.id","projectid")

val df = spark.sqlContext.read.format("com.samelamin.spark.bigquery").option("tableReferenceSource","bigquery-public-data:samples.shakespeare").load()
``

### Reading DataFrame From BigQuery in Pyspark

```python
bq = spark._sc._jvm.com.samelamin.spark.bigquery.BigQuerySQLContext(spark._wrapped._jsqlContext)
df= DataFrame(bq.bigQuerySelect("SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare]"), session._wrapped)

Running DML Queries

import com.samelamin.spark.bigquery._

// Load results from a SQL query
sqlContext.runDMLQuery("UPDATE dataset-id.table-name SET test_col = new_value WHERE test_col = old_value")

Please note that DML queries need to be done using Standard SQL

Update Schemas

You can also allow the saving of a dataframe to update a schema:

import com.samelamin.spark.bigquery._

sqlContext.setAllowSchemaUpdates()

Notes on using this API:

  • Structured Streaming needs a partitioned table which is created by default when writing a stream
  • Structured Streaming needs a timestamp column where offsets are retrieved from, by default all tables are created with a bq_load_timestamp column with a default value of the current timstamp.
  • For use with Databricks please follow this guide

#TODO

Need to upgrade spark version

License

Copyright 2016 samelamin.

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

spark-bigquery's People

Contributors

jonas avatar phatak-dev avatar samelamin avatar vijaykramesh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

spark-bigquery's Issues

Databricks - GCP Key Missing in Multi-Node Cluster

Hi Sam,

I ran into an interesting issue today Im not sure of the cause but suspect its me - its just happened so need to do some deeper digging but thought to reach out to you in case you spotted anything a rookie mistake quickly and could help.

My notebook in the community edition works great and writes my dataframes out to the designated BigQuery tables as expected. All good.

Now I've recently subscribed to a paid databricks account, I run the exact same code in a simple 2 node cluster (1 driver + 1 worker) and I get an exception that the google keyfile cannot be located.

https://www.dropbox.com/s/6kpsqsgqkb2tp2a/Screenshot%202017-05-09%2019.28.39.png?dl=0

but the key is there as can be seen by this:

https://www.dropbox.com/s/yzviehngyhlz5e6/Screenshot%202017-05-09%2019.29.43.png?dl=0

As mentioned its using exactly the same code in both editions - the way for prototyping I set up my key was in the notebook to just save the string of the json key to the local file system using below

dbutils.fs.put("file:/gcpkey.json",gcpKey)

and to the BQ library reference as "/gcpkey.json" - which as mentioned works fine in community editon.

The difference between community and the the proper subscription as you know is on community the driver and worker is a single physical node. I can only guess at the moment (as this just happended) that the key file isnt on the worker in a multi-node environment, this dbutils command just saves on the driver node and I need to distribute it to the worker? Documentation on it isnt so clear.

I assume youve had this working on a multi-node cluster on dbricks so no doubt its my rookie mistake on the platform! Is this the issue in that it just writes to the Driver node or something else? What are your best pracitces for distributing a senstive key like this securely?

I'm assuming there is a better mechanism for production using the Databricks REST API to bootstap a cluster with this (to be investigated), but for now was just using this simple mechanism.

Thoughts?

Thanks heaps for your help!
Cheers

saveAsBigQueryTable exception with StructType column

I am trying to save data using saveAsBigQueryTable, but this is resulting in an error. I simplified the dataframe for a single column, an StructType one, this is the schema:

root
 |-- contexto: struct (nullable = true)
 |    |-- b2wChannel: string (nullable = true)
 |    |-- b2wDeviceType: string (nullable = true)
 |    |-- b2wEPar: string (nullable = true)
 |    |-- b2wOpn: string (nullable = true)
 |    |-- b2wPid: string (nullable = true)
 |    |-- b2wSid: string (nullable = true)
 |    |-- b2wUid: string (nullable = true)
 |    |-- customerId: string (nullable = true)
 |    |-- salesSolution: string (nullable = true

When using other fields (of other types) the dataset is saved as expected, so, the error is in StructType columns.

This is the stack

[error] (run-main-0) java.lang.NoSuchMethodError: org.json4s.JsonDSL$.seq2jvalue(Lscala/collection/Traversable;Lscala/Function1;)Lorg/json4s/JsonAST$JArray;
[error] java.lang.NoSuchMethodError: org.json4s.JsonDSL$.seq2jvalue(Lscala/collection/Traversable;Lscala/Function1;)Lorg/json4s/JsonAST$JArray;
[error] 	at com.samelamin.spark.bigquery.converters.SchemaConverters$$anonfun$typeToJson$3.apply(SchemaConverters.scala:66)
[error] 	at com.samelamin.spark.bigquery.converters.SchemaConverters$$anonfun$typeToJson$3.apply(SchemaConverters.scala:66)
[error] 	at org.json4s.JsonDSL$JsonAssoc.$tilde(JsonDSL.scala:90)
[error] 	at com.samelamin.spark.bigquery.converters.SchemaConverters$.typeToJson(SchemaConverters.scala:66)
[error] 	at com.samelamin.spark.bigquery.converters.SchemaConverters$.com$samelamin$spark$bigquery$converters$SchemaConverters$$fieldToJson(SchemaConverters.scala:84)
[error] 	at com.samelamin.spark.bigquery.converters.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:87)
[error] 	at com.samelamin.spark.bigquery.converters.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:87)
[error] 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[error] 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[error] 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
[error] 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
[error] 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
[error] 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
[error] 	at com.samelamin.spark.bigquery.converters.SchemaConverters$.SqlToBQSchema(SchemaConverters.scala:87)
[error] 	at com.samelamin.spark.bigquery.BigQueryDataFrame.saveAsBigQueryTable(BigQueryDataFrame.scala:41)
[error] 	at ml.freight.prepare.CarregaPedidos$.delayedEndpoint$ml$freight$prepare$CarregaPedidos$1(CarregaPedidos.scala:24)
[error] 	at ml.freight.prepare.CarregaPedidos$delayedInit$body.apply(CarregaPedidos.scala:6)
[error] 	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
[error] 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
[error] 	at scala.App$$anonfun$main$1.apply(App.scala:76)
[error] 	at scala.App$$anonfun$main$1.apply(App.scala:76)
[error] 	at scala.collection.immutable.List.foreach(List.scala:392)
[error] 	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
[error] 	at scala.App$class.main(App.scala:76)
[error] 	at ml.freight.prepare.CarregaPedidos$.main(CarregaPedidos.scala:6)
[error] 	at ml.freight.prepare.CarregaPedidos.main(CarregaPedidos.scala)
[error] 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] 	at java.lang.reflect.Method.invoke(Method.java:498)

DML query drop and create table takes time

I tried to drop and create query using runDMLQuery() but for drop and create tables it takes more than 2 minutes please check following log:

19/02/11 12:33:03 INFO com.samelamin.spark.bigquery.BigQueryClient: Executing DML Statement DROP TABLE IF EXISTS `projectId.dataset.input`
19/02/11 12:33:03 INFO com.samelamin.spark.bigquery.BigQueryClient: Using legacy Sql: false
19/02/11 12:35:15 INFO com.samelamin.spark.bigquery.BigQueryClient: Executing DML Statement CREATE TABLE IF NOT EXISTS `projectId.dataset.input` (id STRING,evnt STRING)
19/02/11 12:35:15 INFO com.samelamin.spark.bigquery.BigQueryClient: Using legacy Sql: false

Is possible to create and drop table execute in less time?

checkpointing when writing in BigQuery?

I have a doubt in relation to writing on BigQuery.

Apparently there is checkpointing in place while writing data to bigquery.

However data is written twice when the spark context is stopped and resumed. It seems that the state is not saved.

The source code is essentially as follows:

val spark = SparkSession.builder
    .master("local[2]")
    .appName("ParquetToBQIngestion")
    .config("spark.streaming.stopGracefullyOnShutdown","true")
    .getOrCreate()

val df = spark
    .readStream
    .option("checkpointLocation", "./data/checkpoint")
    .schema(schema)
    .parquet("<DIRECTORY_WITH_A_STREAM_OF_PARQUET_FILES>")

  val stream = df.writeStream
    .option("checkpointLocation", "./data/checkpoint")
    .option("tableReferenceSink","<TABLE>")
    .format("com.samelamin.spark.bigquery")
    .outputMode("append")
    .start()
    .awaitTermination()

BQ Table Schema - Maintaining Original Case of Spark DF Schema

Hi Sam,

Is it possible to retain the original case for the BQ Table schema generated? Currently it makes all fields lowercase (customeraccountid) when the original is camel (customerAccountId) - this makes it quite hard for some of the business people to read. Is this possible at all to retain the original case?

Cheers
K

Write to bigquery using DataframeWriter

Is there any problem to implement the writing to Bigquery using DataFrameWriter,

What Do you guys think to implement like the redshift lib?

https://github.com/databricks/spark-redshift/blob/master/src/main/scala/com/databricks/spark/redshift/DefaultSource.scala#L68

I want to use the spark like other formats:

df.write.format("com.samelamin.spark.bigquery").option("writeDisposition", "WRITE_TRUNCATE").save("bigquery-dw:poc_data_pipeline.pricing")

This will help to use this library using the same interface that is already build in my project.

Is there any other way to achieve this?

java.io.IOException: Not found: Uris gs

Hi Sam,

Ive been running my streaming ETL jobs (non watermark ones) in Databricks UAT for about 1.5 weeks.

For the most part everything is working fine, however I do get the occasion IO error to GCP bucket which is odd.

java.io.IOException: Not found: Uris gs://sdv-analytics-spark-streaming/hadoop/tmp/spark-bigquery/spark-bigquery-1498172713848=666506084/*

I think this seems to be possibly because of my config / job setup rather then application/transformation logic.

Im going to do some investigating today, but thought to raise it with you incase you had seen a similar exceptions or if its obvious a job setup is wrong?

So one thing to point out, in a given SS job, I have 1 streaming input DF ( reading input events off kinesis), but I stream to 3 targets (doing some transformationsin the middle) - 2 seperate BigQuery tables, 1 S3 bucket sinks - so 3 streamingQuery's in total in a single SS job.

For the 2 BQ target tables in my BQ config, I am using a shared GCP storage folder within the job for SS for intermediary staging area, NOT individual ones.

spark.sqlContext.setBigQueryGcsBucket(sdv-analytics-spark-streaming)

Im wondering if with this IOError, having 2 BQ streamingQuery's from the same single input source DF in the same job is causing this and that the gcp folder suffix part ....

spark-bigquery-1498172713848=666506084

...is wrongly shared as a staging area (as maybe its derived from the input source) ? And does one streamingQuery cleans this up while the other one expects it to be there? Or maybe Im barking up the wrong tree too!!

Should each target StreamingQuery have its OWN GCP bucket rather then a shared one because they are working off the same input stream?

e.g spark.sqlContext.setBigQueryGcsBucket(sdv-analytics-spark-streaming-)

The spark session though is unique per job and not shared, so seems I would need to break the 1 job up into 3 to do this given the shared context - 1 input to 1 sink instead of 1 input to 3?

This could be because of something else, e.g intermittent network issues for example?

When I restart the job it did pick up where it left off and didnt error so its intermittent.

Does this make sense? Should I refactor to 1:1 input to BQ output?

Sorry its a bit long-winded!

Thanks heaps!

Main part of the trace in logs below:

java.io.IOException: Not found: Uris gs://sdv-analytics-spark-streaming/hadoop/tmp/spark-bigquery/spark-bigquery-1498172713848=666506084/*
at com.google.cloud.hadoop.io.bigquery.BigQueryUtils.waitForJobCompletion(BigQueryUtils.java:95) at com.samelamin.spark.bigquery.BigQueryClient.com$samelamin$spark$bigquery$BigQueryClient$$waitForJob(BigQueryClient.scala:143) at com.samelamin.spark.bigquery.BigQueryClient.load(BigQueryClient.scala:110) at com.samelamin.spark.bigquery.package$BigQueryDataFrame.saveAsBigQueryTable(package.scala:163) at com.samelamin.spark.bigquery.streaming.BigQuerySink.addBatch(BigQuerySink.scala:27) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:658)

Utilize Bigquery Storage API

Beta officially announced today, there is the opportunity to leverage the bigquery storage api for reading tables from bq. In theory it should have lower latency than gcs dumps and also be able to leverage predicate pushdowns and column projection while also being avro based.

Are there any plans to integrate the storage all with this or another spark dataframe project?

Using spark-bigquery connector in AWS EMR Zeppelin

Hi.

I am trying to use this connector in AWS EMR Cluster. I just downloaded the jar file from here - https://mvnrepository.com/artifact/com.github.samelamin/spark-bigquery_2.11/0.2.4 and placed it in /usr/lib/spark/jars folder

Then trying to use this in zeppelin notebook spark interpreter.

import com.samelamin.spark.bigquery._

// Set up GCP credentials
sqlContext.setGcpJsonKeyFile("/home/json/google_api_credentials.json")

// Set up BigQuery project and bucket
sqlContext.setBigQueryProjectId("data-1349")
//sqlContext.setBigQueryGcsBucket("<GCS_BUCKET>")

// Set up BigQuery dataset location, default is US
sqlContext.setBigQueryDatasetLocation("US")

And this is the error I am getting, can you please help with this.

java.lang.NoClassDefFoundError: com/google/api/client/http/HttpRequestInitializer
  at com.samelamin.spark.bigquery.BigQuerySQLContext.bq$lzycompute(BigQuerySQLContext.scala:19)
  at com.samelamin.spark.bigquery.BigQuerySQLContext.bq(BigQuerySQLContext.scala:19)
  at com.samelamin.spark.bigquery.BigQuerySQLContext.setBigQueryDatasetLocation(BigQuerySQLContext.scala:69)
  ... 59 elided
Caused by: java.lang.ClassNotFoundException: com.google.api.client.http.HttpRequestInitializer
  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 62 more

Structured Streaming - Quota Limits and Trigger Interval

Hi Sam,

Just wanted some advice based on your experience with SS and BQ quota limit management.

I just noticed that the Daily limit for BigQuery Job Loads is:

'1,000 load jobs per table per day (including failures), 50,000 load jobs per project per day (including failures)'

as there 1440 minutes in a day, the Trigger interval on the streaming query needs to be >= 2 minutes this would suggest, equating to 720 incremental executions. Does that seem right to you? that assumes of course that data is constantly arriving, some intervals may not have data at all and thus no load job is created.

When the new watermarking feature arrives (cant wait!), I intend to use a watermark of 10 minutes anyway (for deduping) so above wont be an issue but just wanted to confirm as currently Ive no Trigger interval set so it just constantly processes once a read is done.

CSV 2 MB (row and cell size) - seems to indicate that a given DataFrame row cant be bigger then 2MB, which is fine. Ours biggest are about 300KB.

Anything else on this that is a gotcha I should think about?

Cheers mate!

Spark 2.1 - Structured Streaming Watermarking Exception

Hi guys,

I am trying to use the structured streaming BQ sink to execute this sliding window:

val orderCreatedEventsSlidingWindowStreamingDF =
      orderCreatedEventsStreamingInputDF
        .withColumn("timestamp",date_format($"event.eventOccurredTime", "YYY-MM-dd'T'hh:mm").cast("timestamp"))
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
          window($"timestamp","10 minutes","5 minutes")
        )
        .count()

orderCreatedEventsSlidingWindowStreamingDF
  .writeStream
      .option("checkpointLocation", "/mnt/streaming-checkpoints")
      .option("tableReferenceSink",s"$bigQueryProjectId:$sdvDataset.$orderCreatedSlidingWindowByHourStreamingTable")
      .format("com.samelamin.spark.bigquery")
      .outputMode(OutputMode.Append)
      .start()

....(note the 'append' mode needed to have watermarks....and some othe boilerplate code is left out above) and have run into this exception setting a watermark for my streamingQuery.

java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#364: timestamp, interval 10 minutes

Firstly is there anything wrong with my watermarking code it appears right? Has anyone created a sliding window similar to this in this Append mode and get it to work with this library?

Secondly if I change this to 'complete' mode (I do want the watermarked sliding window in append mode but plaing around with all modes), I can get data streamed into BQ, if I remove the watermark and set to 'Complete' mode, I get some sample results in the link

https://www.dropbox.com/s/8kvct3oftbu3e9n/Screenshot%202017-05-12%2014.35.26.png?dl=0

However the data been written to BQ seems to be appended not overwritten (refer rows 4 and 5 for the same window but different BQ load times).

Id expect there to only be the same bq_load_timestamp at any given time in this BQ table (the last load time) and only ever 1 unique row for a window? This indicates both of those dont hold true.

I am missing something on these? Appreciate your thoughts?

(note: Im using 2.2 RC2 to access the soon to be GAed kinesis stream connector, but this shouldnt matter)

Thanks!

Table Region not EU

Hi Sam,

My BQ tables created from this library (Create Disposition is CREATE_IF_NEEDED) do not seem to recognise the 'EU' region setting and they all end up with 'Data Location' as 'US' in BQ when looking at the Table details.

spark.sqlContext.setBigQueryDatasetLocation("EU")

Any ideas?

Cheers!

Problems with Guava version using spark 2.2

Hi Sam! How are you ?

I have been trying to use you library to write in batch mode into a big-query table on a local spark 2.2. I'm having problems with the infamous java.lang.NoSuchMethodError: com.google.common.base.Splitter.splitToList(Ljava/lang/CharSequence;)Ljava/util/List;

The last version of spark-bigquery is compatible with spark 2.2 right ?

Also i tried to shade the google.common.base dependency with the maven shade plugin but din't work.

Do you have any idea what could be happening ?

Custom partitioning of historic data using table decorators

Hi Sam,

As a one off, I am migrating 8 years worth of historic data, ingesting flat files and transforming to the new model, with the idea of writting this data (statically - not streamed) to a number of partitioned table in BQ. Ive been chatting to some of the BQ guys who suggested table decorators to enable setting a custom partition and thus get the performance gain in queries in BQ (note: this is actually needed in streaming to so that true event time is used and not ingest time....but this issue is for a static load)

Using the APIs, I need to:

  1. Create a new paritioned table 'order_totals_all_currencies' - I was hoping the APIs can create if it doesnt exist like normal and autodetect / generate the BQ schema 9very cool feature BTW!), but also recognising the table decorator usage. e.g "order_totals_all_currencies$20170717"

  2. Then write the data, however use table decorators to guide the data to the correct event time partition, as mentioned its historical and needs to be saved using a partiton date set in my spark job to be when the order was placed e.g 20150301? So I would be iterating, doing multiple 'saveAsBigQueryTable' per day with a different table decorator.

e.g

df20160101.saveAsBigQueryTable("order_totals_all_currencies$20160101",true,0)
df20160102.saveAsBigQueryTable("order_totals_all_currencies$20160102",true,0)
df20160103.saveAsBigQueryTable("order_totals_all_currencies$20160103",true,0)

Is this possible / what is the best way to do it? Creating the table and schema by hand first would take some time (some large schemas), was hoping the library could handle this scenario and the initial create that has decoration? Is there a way I am missing perhaps?

I was trying below as a test

val bqOrderTotalsAllCurrenciesTable = "order_totals_all_currencies$20170717"

orderTotalsAllCurrenciesHistory20170717DF.saveAsBigQueryTable(s"$bqProjectId:$bqSdvPreAggsDataset.$bqOrderTotalsAllCurrenciesTable",true, 0)

but got this error from google.

'"Invalid table ID "order_totals_all_currencies$20170717". Table IDs must be alphanumeric (plus underscores) and must be at most 1024 characters long. Also, Table decorators cannot be used.",

thought it might create the table (just the first time of course) AND then load the partitoned data subsequent using the date decorator for the gorups of orders.

I need the saveAsTable to be aware of the decorator is most key.

Id actually want to be able to do this in streaming too to enable true event time on the streams where we get data arriving late.

Thoughts?

Cheers

"Couldn't match type NullType"

Hi Sam,

I am trying to save a dataframe and get an odd 'nulltype" error on schema conversion.

"Couldn't match type NullType"

from this bit of code as you will know.

case _ => throw new RuntimeException (s"Couldn't match type $dataType")

The dataFrame is valid, I can print and see the schema find. Seems all the types in the DF schema are covered in the conversion as youd expect :)

Have you come across this before? The dataframe is completely valid of course, meaning every column is valid and has a type - Il keep investigating anyway thought to ask incase you have seen this and know what the root cause might be.

This is on databricks runtime 3.0, using the new Spark 2.2 BTW

Error when changing zone to something other than EU/US

When you set the zone to the following value: europe-north1 i get the error as listed below. Is this a bug or are zones just not supported (yet)?

Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Not found: Job project-name:project-name-ecc57416-1698-45b4-a1d3-cb7d2c88c356",
    "reason" : "notFound"
  } ],
  "message" : "Not found: Job project-name:project-name-ecc57416-1698-45b4-a1d3-cb7d2c88c356",
  "status" : "NOT_FOUND"
}
	at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
	at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
	at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
	at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
	at com.google.cloud.hadoop.util.ResilientOperation$AbstractGoogleClientRequestExecutor.call(ResilientOperation.java:164)
	at com.google.cloud.hadoop.util.ResilientOperation.retry(ResilientOperation.java:64)
	at com.google.cloud.hadoop.io.bigquery.BigQueryUtils.waitForJobCompletion(BigQueryUtils.java:95)
	at com.samelamin.spark.bigquery.BigQueryClient.com$samelamin$spark$bigquery$BigQueryClient$$waitForJob(BigQueryClient.scala:153)
	at com.samelamin.spark.bigquery.BigQueryClient.load(BigQueryClient.scala:116)
	at com.samelamin.spark.bigquery.BigQueryDataFrame.saveAsBigQueryTable(BigQueryDataFrame.scala:43)

Feature Request: Spark MapType mapping to BQuery

Hi Sam,

Firstly loving the library thanks! :) Also enjoyed both your first blog post on the building robust pipelines (second post soon hopefully :) and your commentry on spark testing too on hangouts chat with Holden. Spark testing is for sure challenging as Im finding, Im fairly new to spark / scala so finding my feet but enjoying it. What is the best resources / examples for spark (scala) testing youve come across?

Unfortunately Ive been ill for the last week and have lost time on my project and wont be able to submit a PR for addimg MapType conversion to BQ to do it properly anytime soon.(as mentioned Im quite new to this and not the quickest) If you get time to do that it would of course be very much appreciated but understand you yourself are busy. Otherwise I'll look at it in a few months time post go-live.

Anyway wanted to update you, say thanks and keep up the good work.
Kurt

PySpark: Method saveAsBigQueryTable does not exist

Hi, I can't run the example in PySpark, because I get the error in title.

sc = SparkSession.builder \
    .appName("PySpark To BigQuery: Publication test") \
    .getOrCreate()

sqlContext = SQLContext(sc)

bigquery = sc._sc._jvm.com.samelamin.spark.bigquery

# Prepare the bigquery context
bq = bigquery.BigQuerySQLContext(sc._wrapped._jsqlContext)
# bq.setGcpJsonKeyFile(KEY_FILE)
bq.setBigQueryProjectId(BQ_PROJECT_ID)
bq.setGSProjectId(BQ_PROJECT_ID)
bq.setBigQueryGcsBucket(STAGING_BUCKET)
bq.setBigQueryDatasetLocation(DATASET_LOCATION)

df...

BigQueryDataFrame = bigquery.BigQueryDataFrame(df._jdf)
BigQueryDataFrame.saveAsBigQueryTable("{0}:{1}.{2}".format(BQ_PROJECT_ID, BQ_DATASET_ID, BQ_TABLE_NAME))

py4j.Py4JException: Method saveAsBigQueryTable([class java.lang.String]) does not exist

Pyspark complete code

Hi Sam,
This is amazing library that you built however I was not able to use in python. Can you please share complete code/example of how to use this from python/pyspark.

Regards
Sanjay

runDMLQuery on AWS Glue throws an exception because of Jersey libraries conflict

When using the runDMLQuery method on AWS Glue, it throws an exception:

18/10/30 23:30:49 WARN ServletHandler: Error for /api/v1/applications/application_1540940741469_0002
java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map;
at org.glassfish.jersey.server.ApplicationHandler.<init>(ApplicationHandler.java:331)
at org.glassfish.jersey.servlet.WebComponent.<init>(WebComponent.java:392)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369)
at javax.servlet.GenericServlet.init(GenericServlet.java:158)
at org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:640)
at org.spark_project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:496)
at org.spark_project.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:788)
at org.spark_project.jetty.servlet.ServletHolder.prepare(ServletHolder.java:773)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:578)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:524)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)

In addition to the exception the job freezes for quite a while. I believe this is caused by the conflict of Jersey libraries in the classpath. Any ideas how this could be either solved or workaround-ed?
Thanks in advance.

Spark MapType conversion no supported?

Hi,

I have a MapType struct in my schema, and I am getting the error below when converting - I assume then this isnt supported? Is there any (soonish) plans to do so?

Thanks!

java.lang.IllegalArgumentException: Unsupported type: MapType(StringType,BooleanType,true)
at com.samelamin.spark.bigquery.converters.SchemaConverters$.typeToJson(SchemaConverters.scala:73) at com.samelamin.spark.bigquery.converters.SchemaConverters$.com$samelamin$spark$bigquery$converters$SchemaConverters$$fieldToJson(SchemaConverters.scala:81) at com.samelamin.spark.bigquery.converters.SchemaConverters$$anonfun$typeToJson$2.apply(SchemaConverters.scala:64) at com.samelamin.spark.bigquery.converters.SchemaConverters$$anonfun$typeToJson$2.apply(SchemaConverters.scala:64) at ....

Feature Request: DML Operations - Update, Delete Support

Hi Sam,

Hope you are havingt a great holiday! Thought Id add this to track as we discussed and you thught simple enough to do.

As discussed would be great to be able to invoke these adhoc DML commands when required. The use-case can include things like hygene / cleaning up data controlled via a periodic spark jobs (e.g periodic removal of any duplicates, updating a corrupted column (e.g bq_load_timestamp from a previous bug etc). The

They dont return a DataFrame in the classical sense, however these are more for jobs that help to maintain data quality.

Cheers mate!

ISSUE: bq_load_timestamp corrupted

Hi Sam,

Description:

Latest release 0.18 which fixed event time watermarking may have corrupted the BQ load timestamp.

The error below is returned by BQ indicating an invalid timestamp in bq_load_timestamp (note: the streaming queries that populate this DO NOT use watermarking BTW).

I now need to add EXCEPT(bq_load_timestamp) on my queries to stop them erroring (will need to patch these in the future). Refer below:

For some tests on streams that DO use the new watermarking feature, the timestmap on the sink tables is the beginning of epoc rather then the clock time.

image-1

Let me know if you need anymore info
Kurt

Bug: Streaming Window does not write window.StartTime and window.endTime

Hi Sam,

Hope you are still enjoying some of the relaxation of your hols and are well rested!

Quick one: I've been trying to build a streaming dataframe using windowing and watermarking. Im getting data written to BQ, however in the target table and schema (and thus data written) produced in BQ, there is NO window start and end time, which spark usually produces itself when applying the windowing function. e.g

        .withWatermark("timestamp", "10 minutes")
        .groupBy(
          window($"timestamp","10 minutes","5 minutes")
        )

ref screen shot below:

screenshot 2017-08-11 21 23 07

Data does get written to BQ however I dont have the usual window start and end time columns, which grouping by a window function normally gives.

Cheers
Kurt

Exception when biQuerySelect Returns Zero Rows

I am getting the below exception when the bigquery select returns zero records

Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
at com.samelamin.spark.bigquery.BigQuerySQLContext.bigQuerySelect(BigQuerySQLContext.scala:92)

BQ Streaming Inserts

Hi Sam,

You mentioned if more people requested streaming inserts for Structured Steaming youd consider implementing it.

What if I commissioned your time to do this when you are feeling better? It does cost, but at 5 cents per GB, for the benefit of lower latency, cutting out GCS and having duplicates handled gracefully in the sink (bit of a problem for me at the moment), this would be an awesome feature. And noone else is doing it for Spark (that I know of at least).

Just a thought, of course just decline if its of no interest, no problems.

Regardless, hope you are feeling better mate and the eye recovering well!

Take it easy
Kurt

java.lang.ClassNotFoundException: java.lang.ProcessEnvironment$Variable

Hi Sam ,
I m facing this issue.. I think this could be versioning issue. I am same pyspark code you have given in another issue which I had raised. have you ever faced this or you have any idea about it.

java.lang.ClassNotFoundException: java.lang.ProcessEnvironment$Variable
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:260)
at com.samelamin.spark.bigquery.utils.EnvHacker$.setEnv(EnvHacker.scala:15)
at com.samelamin.spark.bigquery.BigQueryClient$.setGoogleBQEnvVariable(BigQueryClient.scala:51)
at com.samelamin.spark.bigquery.BigQueryClient$.getInstance(BigQueryClient.scala:34)
at com.samelamin.spark.bigquery.BigQuerySQLContext.bq$lzycompute(BigQuerySQLContext.scala:19)
at com.samelamin.spark.bigquery.BigQuerySQLContext.bq(BigQuerySQLContext.scala:19)
at com.samelamin.spark.bigquery.BigQuerySQLContext.bigQuerySelect(BigQuerySQLContext.scala:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Traceback (most recent call last):
File "C:/Users/IT000493/PycharmProjects/sc-ln/pyspark-examples/hello_pyspark.py", line 30, in

File "D:\software\spark-2.2.0-bin-hadoop2.6\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in call
File "D:\software\spark-2.2.0-bin-hadoop2.6\python\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "D:\software\spark-2.2.0-bin-hadoop2.6\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o34.bigQuerySelect.
: java.lang.NoSuchMethodError: com.google.common.base.Splitter.splitToList(Ljava/lang/CharSequence;)Ljava/util/List;
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase$ParentTimestampUpdateIncludePredicate.create(GoogleHadoopFileSystemBase.java:655)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createOptionsBuilderFromConfig(GoogleHadoopFileSystemBase.java:2005)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1697)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:878)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:841)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.getTemporaryPathRoot(BigQueryConfiguration.java:283)
at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.getSplits(AbstractBigQueryInputFormat.java:114)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:125)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1333)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1368)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
at com.samelamin.spark.bigquery.BigQuerySQLContext.bigQuerySelect(BigQuerySQLContext.scala:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)

saveAsBigQueryTable: Schemas dont match?

Hi,

Thanks for your help on the previous issue.

Im getting the error below when trying to save a very simple dataframe (4 strings and 1 boolean - simple!) into a bigQuery table with matching column names, types and even the order (does order of dFrame cols need ot match order for bquery table? I assume not, but have the same order anyway).

dframe.flattenedEvent.saveAsBigQueryTable(...)

its such a simple example but cannot get it to work and get the error below - any ideas? I can read from othe tables, its just the writting/saving thats the issue.

java.io.IOException: Provided Schema does not match Table ******:sdv.event
at com.google.cloud.hadoop.io.bigquery.BigQueryUtils.waitForJobCompletion(BigQueryUtils.java:95) at com.samelamin.spark.bigquery.BigQueryClient.com$samelamin$spark$bigquery$BigQueryClient$$waitForJob(BigQueryClient.scala:143) at com.samelamin.spark.bigquery.BigQueryClient.load(BigQueryClient.scala:110) at com.samelamin.spark.bigquery.package$BigQueryDataFrame.saveAsBigQueryTable(package.scala:163)
.......

Do you actually have a sample public dbricks notebook by any chance you can share? No probs if not

Cheers and thanks
Kurt

class cast exception has occurs (Double cannot be cast to Float)

Hi, I'm trying to analyze firebase data using Spark with this spark-Bigquery. But class cast exception has occurred like Double cannot be cast to Float.
Additionally, Double type exists in the Avro specs, but it seems only Float type casting in the module. (https://avro.apache.org/docs/1.8.1/spec.html)

Would you mind tell me is this a bug?

https://support.google.com/firebase/answer/7029846

Error Detail

  • command
val df = spark.sqlContext.read.format("com.samelamin.spark.bigquery")
  .option("tableReferenceSource","xxxx:yyy.app_events_intraday_20180417")
  .load()
df.printSchema
  • output
root
 |-- user_dim: struct (nullable = true)
 |    |-- user_id: string (nullable = true)
 |    |-- first_open_timestamp_micros: long (nullable = true)
 |    |-- user_properties: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- key: string (nullable = true)
 |    |    |    |-- value: struct (nullable = true)
 |    |    |    |    |-- value: struct (nullable = true)
 |    |    |    |    |    |-- string_value: string (nullable = true)
 |    |    |    |    |    |-- int_value: long (nullable = true)
 |    |    |    |    |    |-- float_value: float (nullable = true)
 |    |    |    |    |    |-- double_value: float (nullable = true)
 |    |    |    |    |-- set_timestamp_usec: long (nullable = true)
 |    |    |    |    |-- index: long (nullable = true)
 |    |-- device_info: struct (nullable = true)
 |    |    |-- device_category: string (nullable = true)
 |    |    |-- mobile_brand_name: string (nullable = true)
 |    |    |-- mobile_model_name: string (nullable = true)
 |    |    |-- mobile_marketing_name: string (nullable = true)
 |    |    |-- device_model: string (nullable = true)
 |    |    |-- platform_version: string (nullable = true)
 |    |    |-- device_id: string (nullable = true)
 |    |    |-- resettable_device_id: string (nullable = true)
 |    |    |-- user_default_language: string (nullable = true)
 |    |    |-- device_time_zone_offset_seconds: long (nullable = true)
 |    |    |-- limited_ad_tracking: boolean (nullable = true)
 |    |-- geo_info: struct (nullable = true)
 |    |    |-- continent: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- region: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |-- app_info: struct (nullable = true)
 |    |    |-- app_version: string (nullable = true)
 |    |    |-- app_instance_id: string (nullable = true)
 |    |    |-- app_store: string (nullable = true)
 |    |    |-- app_platform: string (nullable = true)
 |    |    |-- app_id: string (nullable = true)
 |    |-- traffic_source: struct (nullable = true)
 |    |    |-- user_acquired_campaign: string (nullable = true)
 |    |    |-- user_acquired_source: string (nullable = true)
 |    |    |-- user_acquired_medium: string (nullable = true)
 |    |-- bundle_info: struct (nullable = true)
 |    |    |-- bundle_sequence_id: long (nullable = true)
 |    |    |-- server_timestamp_offset_micros: long (nullable = true)
 |    |-- ltv_info: struct (nullable = true)
 |    |    |-- revenue: float (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |-- event_dim: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- params: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- key: string (nullable = true)
 |    |    |    |    |-- value: struct (nullable = true)
 |    |    |    |    |    |-- string_value: string (nullable = true)
 |    |    |    |    |    |-- int_value: long (nullable = true)
 |    |    |    |    |    |-- float_value: float (nullable = true)
 |    |    |    |    |    |-- double_value: float (nullable = true)
 |    |    |-- timestamp_micros: long (nullable = true)
 |    |    |-- previous_timestamp_micros: long (nullable = true)
 |    |    |-- value_in_usd: float (nullable = true)
  • command
import org.apache.spark.sql.functions._
df.show
  • output
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 9, 10.228.249.82, executor 0): java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.Float

Key File not found

Hiya,

Giving this a whirl on databricks, ran into a small first bump, hoping you can help. The json key file is definitely there as I mount the dir (from s3 where it lives) to dbfs and also do a head of the contents of the json which has contents of course.

However I get below when running the code (refer dropbox link):

java.io.IOException: Error reading credential file from environment variable GOOGLE_APPLICATION_CREDENTIALS, value 'dbfs:/mnt/analytics/config/spark-bigquery-poc-b20523290ff4.json': File does not exist.

https://www.dropbox.com/s/782obtqfn1x6877/Screenshot%202017-04-12%2016.28.24.png?dl=0

Any ideas? New to databricks might be something obvious perhaps?

Cheers!

setBigQueryDatasetLocation break YARN mode

Thanks for the library, seems to be working great!

I ran into a very odd issue that took me far too long to figure out. I'm running Spark 2.1 on Qubole (on AWS) and after getting everything working (my query would complete, etc) was still having the job fail at the end with a particularly odd error:

org.apache.spark.SparkException: YarnSparkHadoopUtil is not available in non-YARN mode!
    at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.get(YarnSparkHadoopUtil.scala:131)
    at org.apache.spark.deploy.yarn.YarnRMClient.getAttemptId(YarnRMClient.scala:96)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:220)
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1994)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)

Doing some binary search commenting out of code to figure out where the problem was and I narrowed it down to this line:

spark.sqlContext.setBigQueryDatasetLocation(bqConfig.getString("datasetLocation"))

(note that config value resolves to "US" which should also be the default)

Removing that line and everything stays in YARN mode and works just fine. Given that the default works fine for my use-case I didn't spend more time tracking down what might be going on, but figured I'd report the issue.

Thanks again!

Minimal working example

Hi Sam,

being new to Spark, I'm wondering whether you happen to have a minimal working example, including a complete set of dependencies.

Best,
Grisha

Exception in thread "main" java.lang.NoSuchMethodError:

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
at com.google.cloud.hadoop.io.bigquery.BigQueryStrings.parseTableReference(BigQueryStrings.java:68)
at com.samelamin.spark.bigquery.BigQueryDataFrame.saveAsBigQueryTable(BigQueryDataFrame.scala:40)

Can you please tell me from which library you are getting 'checkArgument'?

java.lang.ClassCastException: java.lang.Long cannot be cast to java.sql.Timestamp

I am trying to work with bigquery-public-data:chicago_crime.crime using spark and bigquery, and facing this exception below using samelamin/spark-bigquery package.
java.lang.ClassCastException: java.lang.Long cannot be cast to java.sql.Timestamp
org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:294) Is this known bug ?

Bug: Null type not supported error when setting a null value to a DF column

Hi Sam,

I think this might be a bug.

Im migrating some existing data in SparkSQL, and if I want to set a null on a dataframe column due to some conditions in the data, I cannot use null explicitly.

e.g

null AS lastUpdateEventId

I get a 'NullType not supported' error with the schema converter, seems it doent like this

If then I change this to an empty string, it does work

e.g

"" AS AS lastUpdateEventId

All works fine. I can live with the empty string but thought to raise this.

My pipeline is much more stable, up for a week continuously now. Thanks soooo much for your help mate! :)

K

read table second time show error Conflict occurred creating export directory

I tried following code

val df = spark.sqlContext.read.format("com.samelamin.spark.bigquery").option("tableReferenceSource","p1:test.tname").load()
df.show

first time it show result proper
but I tried second time same code

val df = spark.sqlContext.read.format("com.samelamin.spark.bigquery").option("tableReferenceSource","p1:test.tname").load()
df.show

then it show following error

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1368)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
at com.samelamin.spark.bigquery.BigQuerySQLContext.bigQueryTable(BigQuerySQLContext.scala:129)
at com.samelamin.spark.bigquery.BigQueryRelation.buildScan(BigQueryRelation.scala:15)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:298)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2837)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
at org.apache.spark.sql.Dataset.show(Dataset.scala:637)
at org.apache.spark.sql.Dataset.show(Dataset.scala:596)
at org.apache.spark.sql.Dataset.show(Dataset.scala:605)
... 50 elided

How the package writes data on BigQuery?

Reviewing the source code, I understand that Spark DF are written on GCS as json files, then they are loaded from GCS into BigQuery. Can you please confirm this?

I just want to confirm that I do not use Streaming Inserts into Bigquery whose price is $0.05 per GB

Thank you

File not found + HiveContext Issue

Hi,

I'm trying to use the connector you wrote to read/write from BigQuery on Databricks. Very much appreciate the effort the make Databricks and BigQuery play nicely together. Im trying to get this to work on a minimal Databricks Cluster (so just one driver and one worker), using Spark 2.1, and Scala 2.11 (not sure if those are requirements, but they seemed like safe bets).

I've followed the instructions in Databricks.md to install the library sucessfully and modified my init script to copy the credentials file from s3 to /home/ubuntu/databricks/filename.json. After setup , I am able to import the module correctly by running the code provided in the markdown file.

The issue comes when I try to read a table. Based on the suggestion in readme.md I'm trying:

val table = spark.sqlContext.bigQueryTable("project:dataset.table")

and I get the following response:

<console>:37: error: value bigQueryTable is not a member of org.apache.spark.sql.SQLContext
       val table = spark.sqlContext.bigQueryTable("project:dataset.test")

the same thing happens if I use a public set or if I copy a public table into my dataset.

I'm pretty new to scala (mostly use pyspark) so I could be missing something obvious here, but shouldn't the module be adding bigQueryTable to the hive context?

Structured Stremaing schema updates support timeline

Hi Sam,

You mention that this is not currently supported - is there a timeline for this at all?

Our schemas will almost certainly have attributes added over time post our initial launch (e.g more order attriburtes - these would not be breaking, mostly just addative). Without schema updates through the stream, this makes such schema evolution more difficult in terms of managing streaming jog and also the BQ table management (e.greplaying full historic streams again but with new and overwriting which feels overkill).

Be great to hear how you manage this currently at your company?

Cheers once again!
Kurt

p.s - Im now going away this w/e can we reschedule to next week sometime? Cheers mate

Import Error with Databricks from a table with streaming

Hi,

In November of last year my colleague, mayankshah891 raised an issue (#48). We are tying to import data from a BigQuery table where data is streaming into it and we randomly have errors like described in the previous issue.

I have noticed that printing the (almost) full table after caching it helps:

val table = sqlContext.bigQueryTable("bigqueryprojectid:blabla.name_table").cache()
table.show(100000)

It probably forces spark to persist the table and no more connection with BigQuery is required.

We closed issue #48 after mentioning that data was streaming into our big query table. That seemed to explain our problem. It seems that new data is coming in quite frequently (at least every 5 min).

Could you confirm that this is what causes our problem and do you have a more scientific way of getting around it?

Thank you so much for your help. Truly appreciated.

Export FS must derive from GoogleHadoopFileSystemBase

Hello,

I'm trying to process some data from BigQuery using the local cluster and then write it in hdfs. I keep getting the following error :

java.lang.IllegalStateException: Export FS must derive from GoogleHadoopFileSystemBase. at com.google.common.base.Preconditions.checkState(Preconditions.java:456) at com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.getTemporaryPathRoot(BigQueryConfiguration.java:363) at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.getSplits(AbstractBigQueryInputFormat.java:126) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1343) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.take(RDD.scala:1337) at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1378) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.first(RDD.scala:1377) at com.samelamin.spark.bigquery.BigQuerySQLContext.bigQuerySelect(BigQuerySQLContext.scala:96)

Sample code :

`
spark.sqlContext.setBigQueryProjectId("XXXX")
spark.sqlContext.setBigQueryDatasetLocation("EU")
spark.sqlContext.setBigQueryGcsBucket("XXXXX")
spark.sqlContext.useStandardSQLDialect(true)

        val table = spark
          .sqlContext
          .bigQuerySelect(
            """
              |SELECT a, b, c
              |FROM  `XXX.YYYY.ZZZZ`;
            """.stripMargin)

            table.show

`

Thank you for your help.

Json parsing failed when i was using the saveAsBigQueryTable

I was trying to load the data to BigQuery using the below sample code
val conf1 = new SparkConf().setAppName("App").setMaster("local[2]")
val sc = new SparkContext(conf1)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.setGcpJsonKeyFile("gcskey.json")

// Set up BigQuery project and bucket
sqlContext.setBigQueryProjectId("proj_name")
sqlContext.setBigQueryGcsBucket("gcsbucket")

// Set up BigQuery dataset location, default is US
sqlContext.setBigQueryDatasetLocation("US")
Usage:

// Load everything from a table
val table = sqlContext.bigQueryTable("bigquery-public-data:samples.shakespeare")

// Load results from a SQL query
// Only legacy SQL dialect is supported for now
val df = sqlContext.bigQuerySelect(
"SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare]")

// Save data to a table
df.saveAsBigQueryTable("my-project:my_dataset.my_table")

While executing the code i got below error when it was trying to execute the last statement

165037 [main] ERROR org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation - Aborting job.
java.io.IOException: Failed to parse JSON: Unexpected token; Parser terminated before end of string

Can somebody help me to resolve this issue?

Struck with error py4j.protocol.Py4JJavaError: An error occurred while calling o39.saveAsBigQueryTable. : java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)

Using Amazon EMR with Hadoop2, Java 1.8.
i would like stream data from Amazon Emr to Bigquery
Struck with getting error
File "/home/hadoop/pyjobs/py_script/s3_bigquery_0_1.py", line 58, in
bqDF.saveAsBigQueryTable("{0}:{1}.{2}".format(BQ_PROJECT_ID, BQ_DATA_SET, TABLE_NAME),False,0,bigquery.getattr("package$WriteDisposition$").getattr("MODULE$").WRITE_EMPTY(),bigquery.getattr("package$CreateDisposition$").getattr("MODULE$").CREATE_IF_NEEDED())
File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call
File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/local/lib/python2.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.saveAsBigQueryTable.
: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
at com.google.cloud.hadoop.io.bigquery.BigQueryStrings.parseTableReference(BigQueryStrings.java:68)
at com.samelamin.spark.bigquery.BigQueryDataFrame.saveAsBigQueryTable(BigQueryDataFrame.scala:40)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

Command Line
spark-submit --packages com.github.samelamin:spark-bigquery_2.11:0.2.6,org.apache.hadoop:hadoop-aws:2.7.3,com.databricks:spark-csv_2.11:1.3.0 --jars /home/hadoop/pyjobs/jars/minimal-json-0.9.4.jar,/home/hadoop/pyjobs/jars/spark-bigquery-0.2.5.jar,/home/hadoop/pyjobs/jars/spark-bigquery-0.1.0-s_2.11.jar,/home/hadoop/pyjobs/jars/gcs-connector-hadoop2-latest.jar,/home/hadoop/pyjobs/jars/google-api-client-1.4.1-beta.jar,/home/hadoop/pyjobs/jars/guava-21.0.jar,,/home/hadoop/pyjobs/jars/google-api-services-bigquery-v2-rev92-1.14.2-beta.jar /home/hadoop/pyjobs/py_script/s3_bigquery_0_1.py

Usage with RStudio's sparklyr

I attempted to use your library with RStudio's sparklyr package. sparklyr's spark_read_source relies on availability of a DefaultSource which implements org.apache.spark.sql.sources.RelationProvider. In a fork of your library I have extended DefaultSource as follows (short version):

class DefaultSource extends .... with RelationProvider {
  ...

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
    val otherSqlName = sqlContext
    new BaseRelation {
      override def schema: StructType = getConvertedSchema(sqlContext, parameters)
      override def sqlContext: SQLContext = otherSqlName
    }
  }
}

The corresponding R/sparklyr code is as follows (using a locally published version of your library):

library(sparklyr)

# https://developers.google.com/identity/protocols/application-default-credentials
Sys.setenv("GOOGLE_APPLICATION_CREDENTIALS" = "credentials.json")

config <- spark_config()
# Modified version in local ivy cache
config[["sparklyr.defaultPackages"]] <- c("com.github.samelamin:spark-bigquery_2.11:0.2.2")

sc <- spark_connect(master = "local", config = config)
my_table <-
  spark_read_source(
    sc,
    name = "mytable",
    source = "com.samelamin.spark.bigquery",
    options = list(
      "tableReferenceSource" = "my-project:my_dataset.my_table",
    )
  )

When trying to execute the above I get the following exception:

Error: java.lang.AssertionError: assertion failed: No plan for Relation[name#10,latitude#11,longitude#12,bq_load_timestamp#13] com.samelamin.spark.bigquery.DefaultSource$$anon$1@429b3f9f

	at scala.Predef$.assert(Predef.scala:170)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
	at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104)
	at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
	at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
	at org.apache.spark.sql.internal.CatalogImpl.cacheTable(CatalogImpl.scala:419)
	at org.apache.spark.sql.execution.command.CacheTableCommand.run(cache.scala:41)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:182)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at sparklyr.Invoke$.invoke(invoke.scala:102)
	at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
	at sparklyr.StreamHandler$.read(stream.scala:62)
	at sparklyr.BackendHandler.channelRead0(handler.scala:52)
	at sparklyr.BackendHandler.channelRead0(handler.scala:14)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:748)

It seems to successfully connect and determine the schema but then fails with "No plan for Relation". Any idea what could be causing this?

Import Error with Databricks

I am attempting to read a table from BigQuery and write it to another database.

On my first few tries, I was able to read the table from BigQuery, but when I attempted to write said table to the database, I got the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed 4 times, most recent failure: Lost task 1.3 in stage 13.0 (TID 41, 10.102.241.112, executor 0): java.lang.IllegalStateException: Found known file &apos;data-000000000002.avro&apos; with index 2, which isn&apos;t less than or equal to than endFileNumber 0!

Then, I checked our server, and it had actually worked! So I figured never mind, let me run the process all the way through again.

However, now I can't even read the initial table from BigQuery, with the exact same code. It returns this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 4 times, most recent failure: Lost task 0.3 in stage 20.0 (TID 53, 10.102.249.174, executor 0): java.lang.IllegalStateException: Found known file 'data-000000000002.avro' with index 2, which isn't less than or equal to than endFileNumber 1!

Any clues? I'm using Databricks to import a table from BigQuery. The cluster I'm using is Databricks 3.3, which includes Spark 2.2.0 and Scala 2.11.

Thank you so much for your time and expertise

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.