Code Monkey home page Code Monkey logo

sb-miniproject8's Introduction

sb-miniproject8

Spark Optimization - Mini Project

Summary

Experiment settings:

  • Standalone spark with one master node and two worker nodes in the same Ubuntu server.
  • Master node: 8 cores, 16 GB memory
  • Worker node: 8 cores, 1GB

Step 1: For each method, restart spark and submit the spark job using following command:

spark-submit --master spark://192.168.0.2:7077 optimize.py

Step 2: Get the duration of completed job at Spark webui (localhost:8080)

Step 3: Repeat step 1 and 2 N times (N = 5) in this experiment

Step 4: Collect the results and compute the average runtime

The final results:

Method Duration (s)
Original query 9
Changing join order 9
Using broadcast 9
Using cache 4.5
Cache + broadcast 5.4

Tuning using --num-executors, --executor-cores

We run experiemnents with different settings of --num-executors and --executor-cores to understand how those values affect to the performance

--num-executors --executor-cores Elapse time (s)
1 1 12
2 1 12
4 1 12
8 1 12
1 2 8
2 2 7
4 2 8
8 2 7
1 4 6
2 4 5
4 4 5
8 4 5
1 8 4
2 8 5
4 8 5
8 8 4

Analysis

  • Changing join order would not improve the performance in this case. The reason is the sizes of two tables are similar.
  • Broadcast give zero improvement. The reason is we use standalone server with only two worker nodes and the dataset is not large enough to make the broadcast works effectively.
  • Caching dataframe improve the performance neraly two times. The reason is cach work best for small dataset and standalone server where master and worker nodes locate in the same physical machine.

localhost:8080when run PySpark image1

localhost:4040 when run on Scalar - SparkShell image2

Data model

Questions:

root
 |-- question_id: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- creation_date: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- accepted_answer_id: long (nullable = true)
 |-- comments: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- views: long (nullable = true)
 
 +-----------+---------+--------------------+--------+-------+-----+
|question_id|answer_id|       creation_date|comments|user_id|score|
+-----------+---------+--------------------+--------+-------+-----+
|     226592|   226595|2015-12-29 17:46:...|       3|  82798|    2|
|     388057|   388062|2018-02-22 12:52:...|       8|    520|   21|
|     293286|   293305|2016-11-17 15:35:...|       0|  47472|    2|
|     442499|   442503|2018-11-22 00:34:...|       0| 137289|    0|
|     293009|   293031|2016-11-16 07:36:...|       0|  83721|    0|
|     395532|   395537|2018-03-25 00:51:...|       0|   1325|    0|
|     329826|   329843|2017-04-29 10:42:...|       4|    520|    1|
|     294710|   295061|2016-11-26 19:29:...|       2| 114696|    2|
|     291910|   291917|2016-11-10 04:56:...|       0| 114696|    2|
|     372382|   372394|2017-12-03 20:17:...|       0| 172328|    0|
|     178387|   178394|2015-04-25 12:31:...|       6|  62726|    0|
|     393947|   393948|2018-03-17 17:22:...|       0| 165299|    9|
|     432001|   432696|2018-10-05 03:47:...|       1| 102218|    0|
|     322740|   322746|2017-03-31 13:10:...|       0|    392|    0|
|     397003|   397008|2018-04-01 06:31:...|       1| 189394|    6|
|     223572|   223628|2015-12-11 23:40:...|       0|  94772|   -1|
|     220328|   220331|2015-11-24 09:57:...|       3|  92883|    1|
|     176400|   176491|2015-04-16 08:13:...|       0|  40330|    0|
|     265167|   265179|2016-06-28 06:58:...|       0|  46790|    0|
|     309103|   309105|2017-02-01 11:00:...|       2|  89597|    2|
+-----------+---------+--------------------+--------+-------+-----+

Answers

root
 |-- question_id: long (nullable = true)
 |-- answer_id: long (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- comments: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- score: long (nullable = true) 
 
 +-----------+--------------------+--------------------+--------------------+------------------+--------+-------+-----+
|question_id|                tags|       creation_date|               title|accepted_answer_id|comments|user_id|views|
+-----------+--------------------+--------------------+--------------------+------------------+--------+-------+-----+
|     382738|[optics, waves, f...|2018-01-28 01:22:...|What is the pseud...|            382772|       0|  76347|   32|
|     370717|[field-theory, de...|2017-11-25 03:09:...|What is the defin...|              null|       1|  75085|   82|
|     339944|[general-relativi...|2017-06-17 15:32:...|Could gravitation...|              null|      13| 116137|  333|
|     233852|[homework-and-exe...|2016-02-04 15:19:...|When does travell...|              null|       9|  95831|  185|
|     294165|[quantum-mechanic...|2016-11-22 05:39:...|Time-dependent qu...|              null|       1| 118807|   56|
|     173819|[homework-and-exe...|2015-04-02 10:56:...|Finding Magnetic ...|              null|       5|  76767| 3709|
|     265198|    [thermodynamics]|2016-06-28 09:56:...|Physical meaning ...|              null|       2|  65035| 1211|
|     175015|[quantum-mechanic...|2015-04-08 20:24:...|Understanding a m...|              null|       1|  76155|  326|
|     413973|[quantum-mechanic...|2018-06-27 08:29:...|Incorporate spino...|              null|       3| 167682|   81|
|     303670|[quantum-field-th...|2017-01-08 00:05:...|A Wilson line pro...|              null|       0| 101968|  184|
|     317368|[general-relativi...|2017-03-08 13:53:...|Shouldn't Torsion...|              null|       0|  20427|  305|
|     369982|[quantum-mechanic...|2017-11-20 21:11:...|Incompressible in...|              null|       4| 124864|   83|
|     239745|[quantum-mechanic...|2016-02-25 02:51:...|Is this correct? ...|            239773|       2|  89821|   78|
|     412294|[quantum-mechanic...|2018-06-17 19:46:...|Is electron/photo...|              null|       0|    605|   61|
|     437521|[thermodynamics, ...|2018-10-29 01:49:...|Distance Dependen...|              null|       2| 211152|   19|
|     289701|[quantum-field-th...|2016-10-29 22:56:...|Generalize QFT wi...|              null|       4|  31922|   49|
|     239505|[definition, stab...|2016-02-24 04:51:...|conditions for so...|              null|       3| 102021|  121|
|     300744|[electromagnetism...|2016-12-24 12:14:...|Maxwell equations...|            300749|       0| 112190|  171|
|     217315|[nuclear-physics,...|2015-11-08 03:13:...|Is the direction ...|              null|       1|  60150| 1749|
|     334778|[cosmology, cosmo...|2017-05-22 08:58:...|Why are fluctatio...|            334791|       3| 109312|  110|
+-----------+--------------------+--------------------+--------------------+------------------+--------+-------+-----+

The original queries

Aggregate Question table

Count number of quesiton grouped by question_id, and mont

answers_month = answersDF.withColumn('month', month('creation_date')).groupBy('question_id', 'month').agg(count('*').alias('cnt')

Query plan

== Physical Plan ==
*(2) HashAggregate(keys=[question_id#0L, month#110], functions=[count(1)])
+- Exchange hashpartitioning(question_id#0L, month#110, 200), true, [id=#100]
   +- *(1) HashAggregate(keys=[question_id#0L, month#110], functions=[partial_count(1)])
      +- *(1) Project [question_id#0L, month(cast(creation_date#2 as date)) AS month#110]
         +- *(1) ColumnarToRow
            +- FileScan parquet [question_id#0L,creation_date#2] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/answers], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<question_id:bigint,creation_date:timestamp>

Sample outputs

+-----------+-----+---+
|question_id|month|cnt|
+-----------+-----+---+
|     358894|    9|  5|
|     332782|    5|  2|
|     281552|    9|  2|
|     332224|    5|  1|
|     395851|    3|  3|
|     192346|    7|  1|
|     302487|    1|  3|
|     317571|    3|  2|
|     179458|    5|  2|
|     294966|   11|  5|
|     199602|    8|  6|
|     251275|    4|  2|
|     208722|    9|  1|
|     284125|   10|  1|
|     427452|    9|  4|
|     399738|    4|  1|
|     217997|   11|  4|
|     386225|    2|  1|
|     305095|    1|  3|
|     206822|    9|  6|
+-----------+-----+---+

Join tables

resultDF = questionsDF.join(answers_month, 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')
resultDF.explain()

resultDF.orderBy('question_id', 'month').show()

Query plan

== Physical Plan ==
*(3) Project [question_id#12L, creation_date#14, title#15, month#110, cnt#126L]
+- *(3) BroadcastHashJoin [question_id#12L], [question_id#0L], Inner, BuildRight
   :- *(3) Project [question_id#12L, creation_date#14, title#15]
   :  +- *(3) Filter isnotnull(question_id#12L)
   :     +- *(3) ColumnarToRow
   :        +- FileScan parquet [question_id#12L,creation_date#14,title#15] Batched: true, DataFilters: [isnotnull(question_id#12L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/questions], PartitionFilters: [], PushedFilters: [IsNotNull(question_id)], ReadSchema: struct<question_id:bigint,creation_date:timestamp,title:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#200]
      +- *(2) HashAggregate(keys=[question_id#0L, month#110], functions=[count(1)])
         +- Exchange hashpartitioning(question_id#0L, month#110, 200), true, [id=#196]
            +- *(1) HashAggregate(keys=[question_id#0L, month#110], functions=[partial_count(1)])
               +- *(1) Project [question_id#0L, month(cast(creation_date#2 as date)) AS month#110]
                  +- *(1) Filter isnotnull(question_id#0L)
                     +- *(1) ColumnarToRow
                        +- FileScan parquet [question_id#0L,creation_date#2] Batched: true, DataFilters: [isnotnull(question_id#0L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/answers], PartitionFilters: [], PushedFilters: [IsNotNull(question_id)], ReadSchema: struct<question_id:bigint,creation_date:timestamp>

Query result:

+-----------+--------------------+--------------------+-----+---+
|question_id|       creation_date|               title|month|cnt|
+-----------+--------------------+--------------------+-----+---+
|     155989|2014-12-31 19:59:...|Frost bubble form...|    2|  1|
|     155989|2014-12-31 19:59:...|Frost bubble form...|   12|  1|
|     155990|2014-12-31 20:51:...|The abstract spac...|    1|  2|
|     155992|2014-12-31 21:44:...|centrifugal force...|    1|  1|
|     155993|2014-12-31 21:56:...|How can I estimat...|    1|  1|
|     155995|2014-12-31 23:16:...|Why should a solu...|    1|  3|
|     155996|2015-01-01 00:06:...|Why do we assume ...|    1|  2|
|     155996|2015-01-01 00:06:...|Why do we assume ...|    2|  1|
|     155996|2015-01-01 00:06:...|Why do we assume ...|   11|  1|
|     155997|2015-01-01 00:26:...|Why do square sha...|    1|  3|
|     155999|2015-01-01 01:01:...|Diagonalizability...|    1|  1|
|     156008|2015-01-01 02:48:...|Capturing a light...|    1|  2|
|     156008|2015-01-01 02:48:...|Capturing a light...|   11|  1|
|     156016|2015-01-01 04:31:...|The interference ...|    1|  1|
|     156020|2015-01-01 05:19:...|What is going on ...|    1|  1|
|     156021|2015-01-01 05:21:...|How to calculate ...|    2|  1|
|     156022|2015-01-01 05:55:...|Advice on Major S...|    1|  1|
|     156025|2015-01-01 06:32:...|Deriving the Cano...|    1|  1|
|     156026|2015-01-01 06:49:...|Does Bell's inequ...|    1|  3|
|     156027|2015-01-01 06:49:...|Deriving X atom f...|    1|  1|
+-----------+--------------------+--------------------+-----+---+
only showing top 20 rows

73020

Optimizations

Change join order

resultDF = answers_month.join(questionsDF, 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')

Query plan

== Physical Plan ==
*(3) Project [question_id#0L, creation_date#14, title#15, month#110, cnt#126L]
+- *(3) BroadcastHashJoin [question_id#0L], [question_id#12L], Inner, BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#422]
   :  +- *(2) HashAggregate(keys=[question_id#0L, month#110], functions=[count(1)])
   :     +- Exchange hashpartitioning(question_id#0L, month#110, 200), true, [id=#418]
   :        +- *(1) HashAggregate(keys=[question_id#0L, month#110], functions=[partial_count(1)])
   :           +- *(1) Project [question_id#0L, month(cast(creation_date#2 as date)) AS month#110]
   :              +- *(1) Filter isnotnull(question_id#0L)
   :                 +- *(1) ColumnarToRow
   :                    +- FileScan parquet [question_id#0L,creation_date#2] Batched: true, DataFilters: [isnotnull(question_id#0L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/answers], PartitionFilters: [], PushedFilters: [IsNotNull(question_id)], ReadSchema: struct<question_id:bigint,creation_date:timestamp>
   +- *(3) Project [question_id#12L, creation_date#14, title#15]
      +- *(3) Filter isnotnull(question_id#12L)
         +- *(3) ColumnarToRow
            +- FileScan parquet [question_id#12L,creation_date#14,title#15] Batched: true, DataFilters: [isnotnull(question_id#12L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/questions], PartitionFilters: [], PushedFilters: [IsNotNull(question_id)], ReadSchema: struct<question_id:bigint,creation_date:timestamp,title:string>

With broadcast

from pyspark.sql.functions import broadcast
resultDF = questionsDF.join(broadcast(answers_month), 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')

Query plan

== Physical Plan ==
*(3) Project [question_id#12L, creation_date#14, title#15, month#110, cnt#126L]
+- *(3) BroadcastHashJoin [question_id#12L], [question_id#0L], Inner, BuildRight
   :- *(3) Project [question_id#12L, creation_date#14, title#15]
   :  +- *(3) Filter isnotnull(question_id#12L)
   :     +- *(3) ColumnarToRow
   :        +- FileScan parquet [question_id#12L,creation_date#14,title#15] Batched: true, DataFilters: [isnotnull(question_id#12L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/questions], PartitionFilters: [], PushedFilters: [IsNotNull(question_id)], ReadSchema: struct<question_id:bigint,creation_date:timestamp,title:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#878]
      +- *(2) HashAggregate(keys=[question_id#0L, month#110], functions=[count(1)])
         +- Exchange hashpartitioning(question_id#0L, month#110, 200), true, [id=#874]
            +- *(1) HashAggregate(keys=[question_id#0L, month#110], functions=[partial_count(1)])
               +- *(1) Project [question_id#0L, month(cast(creation_date#2 as date)) AS month#110]
                  +- *(1) Filter isnotnull(question_id#0L)
                     +- *(1) ColumnarToRow
                        +- FileScan parquet [question_id#0L,creation_date#2] Batched: true, DataFilters: [isnotnull(question_id#0L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/answers], PartitionFilters: [], PushedFilters: [IsNotNull(question_id)], ReadSchema: struct<question_id:bigint,creation_date:timestamp>
  • One big different compare to the original is the broadcast approach replace Exchange by BroadcastExchange.
  • Eventhough BroadcastExchange is faster than Exchange the overall duration is not different much (9s v.s 9s)

With cache

answers_month.cache()
resultDF = questionsDF.join(answers_month, 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')

Query plan

== Physical Plan ==
*(2) Project [question_id#12L, creation_date#14, title#15, month#329, cnt#345L]
+- *(2) BroadcastHashJoin [question_id#12L], [question_id#0L], Inner, BuildRight
   :- *(2) Project [question_id#12L, creation_date#14, title#15]
   :  +- *(2) Filter isnotnull(question_id#12L)
   :     +- *(2) ColumnarToRow
   :        +- FileScan parquet [question_id#12L,creation_date#14,title#15] Batched: true, DataFilters: [isnotnull(question_id#12L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/questions], PartitionFilters: [], PushedFilters: [IsNotNull(question_id)], ReadSchema: struct<question_id:bigint,creation_date:timestamp,title:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#1172]
      +- *(1) Filter isnotnull(question_id#0L)
         +- *(1) ColumnarToRow
            +- InMemoryTableScan [question_id#0L, month#329, cnt#345L], [isnotnull(question_id#0L)]
                  +- InMemoryRelation [question_id#0L, month#329, cnt#345L], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(2) HashAggregate(keys=[question_id#0L, month#329], functions=[count(1)])
                           +- Exchange hashpartitioning(question_id#0L, month#329, 200), true, [id=#1128]
                              +- *(1) HashAggregate(keys=[question_id#0L, month#329], functions=[partial_count(1)])
                                 +- *(1) Project [question_id#0L, month(cast(creation_date#2 as date)) AS month#329]
                                    +- *(1) ColumnarToRow
                                       +- FileScan parquet [question_id#0L,creation_date#2] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/answers], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<question_id:bigint,creation_date:timestamp>

Cache + broadcast

from pyspark.sql.functions import broadcast
answers_month.cache()
resultDF = questionsDF.join(broadcast(answers_month), 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')

Query plan

== Physical Plan ==
*(2) Project [question_id#12L, creation_date#14, title#15, month#329, cnt#345L]
+- *(2) BroadcastHashJoin [question_id#12L], [question_id#0L], Inner, BuildRight
   :- *(2) Project [question_id#12L, creation_date#14, title#15]
   :  +- *(2) Filter isnotnull(question_id#12L)
   :     +- *(2) ColumnarToRow
   :        +- FileScan parquet [question_id#12L,creation_date#14,title#15] Batched: true, DataFilters: [isnotnull(question_id#12L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/questions], PartitionFilters: [], PushedFilters: [IsNotNull(question_id)], ReadSchema: struct<question_id:bigint,creation_date:timestamp,title:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#1492]
      +- *(1) Filter isnotnull(question_id#0L)
         +- *(1) ColumnarToRow
            +- InMemoryTableScan [question_id#0L, month#329, cnt#345L], [isnotnull(question_id#0L)]
                  +- InMemoryRelation [question_id#0L, month#329, cnt#345L], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(2) HashAggregate(keys=[question_id#0L, month#329], functions=[count(1)])
                           +- Exchange hashpartitioning(question_id#0L, month#329, 200), true, [id=#1128]
                              +- *(1) HashAggregate(keys=[question_id#0L, month#329], functions=[partial_count(1)])
                                 +- *(1) Project [question_id#0L, month(cast(creation_date#2 as date)) AS month#329]
                                    +- *(1) ColumnarToRow
                                       +- FileScan parquet [question_id#0L,creation_date#2] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/dtn/notebooks/data/answers], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<question_id:bigint,creation_date:timestamp>

sb-miniproject8's People

Contributors

trdtnguyen avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.