Code Monkey home page Code Monkey logo

flint's Introduction

Flint: A Time Series Library for Apache Spark

The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.

Flint is an open source library for Spark based around the TimeSeriesRDD, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDDs. Unlike DataFrame and Dataset, Flint's TimeSeriesRDDs can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties. It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.

Documentation Status

Requirements

Dependency Version
Spark Version 2.3 and 2.4
Scala Version 2.12
Python Version 3.5 and above

How to install

Scala artifact is published in maven central:

https://mvnrepository.com/artifact/com.twosigma/flint

Python artifact is published in PyPi:

https://pypi.org/project/ts-flint

Note you will need both Scala and Python artifact to use Flint with PySpark.

How to build

To build from source:

Scala (in top-level dir):

sbt assemblyNoTest

Python (in python subdir):

python setup.py install

or

pip install .

Python bindings

The python bindings for Flint, including quickstart instructions, are documented at python/README.md. API documentation is available at http://ts-flint.readthedocs.io/en/latest/.

Getting Started

Starting Point: TimeSeriesRDD and TimeSeriesDataFrame

The entry point into all functionalities for time series analysis in Flint is TimeSeriesRDD (for Scala) and TimeSeriesDataFrame (for Python). In high level, a TimeSeriesRDD contains an OrderedRDD which could be used to represent a sequence of ordering key-value pairs. A TimeSeriesRDD uses Long to represent timestamps in nanoseconds since epoch as keys and InternalRows as values for OrderedRDD to represent a time series data set.

Create TimeSeriesRDD

Applications can create a TimeSeriesRDD from an existing RDD, from an OrderedRDD, from a DataFrame, or from a single csv file.

As an example, the following creates a TimeSeriesRDD from a gzipped CSV file with header and specific datetime format.

import com.twosigma.flint.timeseries.CSV
val tsRdd = CSV.from(
  sqlContext,
  "file://foo/bar/data.csv",
  header = true,
  dateFormat = "yyyyMMdd HH:mm:ss.SSS",
  codec = "gzip",
  sorted = true
)

To create a TimeSeriesRDD from a DataFrame, you have to make sure the DataFrame contains a column named "time" of type LongType.

import com.twosigma.flint.timeseries.TimeSeriesRDD
import scala.concurrent.duration._
val df = ... // A DataFrame whose rows have been sorted by their timestamps under "time" column
val tsRdd = TimeSeriesRDD.fromDF(dataFrame = df)(isSorted = true, timeUnit = MILLISECONDS)

One could also create a TimeSeriesRDD from a RDD[Row] or an OrderedRDD[Long, Row] by providing a schema, e.g.

import com.twosigma.flint.timeseries._
import scala.concurrent.duration._
val rdd = ... // An RDD whose rows have sorted by their timestamps
val tsRdd = TimeSeriesRDD.fromRDD(
  rdd,
  schema = Schema("time" -> LongType, "price" -> DoubleType)
)(isSorted = true,
  timeUnit = MILLISECONDS
)

It is also possible to create a TimeSeriesRDD from a dataset stored as parquet format file(s). The TimeSeriesRDD.fromParquet() function provides the option to specify which columns and/or the time range you are interested, e.g.

import com.twosigma.flint.timeseries._
import scala.concurrent.duration._
val tsRdd = TimeSeriesRDD.fromParquet(
  sqlContext,
  path = "hdfs://foo/bar/"
)(isSorted = true,
  timeUnit = MILLISECONDS,
  columns = Seq("time", "id", "price"),  // By default, null for all columns
  begin = "20100101",                    // By default, null for no boundary at begin
  end = "20150101"                       // By default, null for no boundary at end
)

Group functions

A group function is to group rows with nearby (or exactly the same) timestamps.

  • groupByCycle A function to group rows within a cycle, i.e. rows with exactly the same timestamps. For example,
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1000L 2.0
// 2000L 3.0
// 2000L 4.0
// 2000L 5.0

val results = priceTSRdd.groupByCycle()
// time  rows
// ------------------------------------------------
// 1000L [[1000L, 1.0], [1000L, 2.0]]
// 2000L [[2000L, 3.0], [2000L, 4.0], [2000L, 5.0]]
  • groupByInterval A function to group rows whose timestamps fall into an interval. Intervals could be defined by another TimeSeriesRDD. Its timestamps will be used to defined intervals, i.e. two sequential timestamps define an interval. For example,
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1500L 2.0
// 2000L 3.0
// 2500L 4.0

val clockTSRdd = ...
// A TimeSeriesRDD with only column "time"
// time
// -----
// 1000L
// 2000L
// 3000L

val results = priceTSRdd.groupByInterval(clockTSRdd)
// time  rows
// ----------------------------------
// 1000L [[1000L, 1.0], [1500L, 2.0]]
// 2000L [[2000L, 3.0], [2500L, 4.0]]
  • addWindows For each row, this function adds a new column whose value for a row is a list of rows within its window.
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1500L 2.0
// 2000L 3.0
// 2500L 4.0

val result = priceTSRdd.addWindows(Window.pastAbsoluteTime("1000ns"))
// time  price window_past_1000ns
// ------------------------------------------------------
// 1000L 1.0   [[1000L, 1.0]]
// 1500L 2.0   [[1000L, 1.0], [1500L, 2.0]]
// 2000L 3.0   [[1000L, 1.0], [1500L, 2.0], [2000L, 3.0]]
// 2500L 4.0   [[1500L, 2.0], [2000L, 3.0], [2500L, 4.0]]

Temporal Join Functions

A temporal join function is a join function defined by a matching criteria over time. A tolerance in temporal join matching criteria specifies how much it should look past or look futue.

  • leftJoin A function performs the temporal left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, append the most recent row from the right at or before the same time. An example to join two TimeSeriesRDDs is as follows.
val leftTSRdd = ...
val rightTSRdd = ...
val result = leftTSRdd.leftJoin(rightTSRdd, tolerance = "1day")
  • futureLeftJoin A function performs the temporal future left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, appends the closest future row from the right at or after the same time.
val result = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = "1day")

Summarize Functions

Summarize functions are the functions to apply summarizer(s) to rows within a certain period, like cycle, interval, windows, etc.

  • summarizeCycles A function computes aggregate statistics of rows that are within a cycle, i.e. rows share a timestamp.
val volTSRdd = ...
// A TimeSeriesRDD with columns "time", "id", and "volume"
// time  id volume
// ------------
// 1000L 1  100
// 1000L 2  200
// 2000L 1  300
// 2000L 2  400

val result = volTSRdd.summarizeCycles(Summary.sum("volume"))
// time  volume_sum
// ----------------
// 1000L 300
// 2000L 700

Similarly, we could summarize over intervals, windows, or the whole time series data set. See

  • summarizeIntervals
  • summarizeWindows
  • addSummaryColumns

One could check timeseries.summarize.summarizer for different kinds of summarizer(s), like ZScoreSummarizer, CorrelationSummarizer, NthCentralMomentSummarizer etc.

Contributing

In order to accept your code contributions, please fill out the appropriate Contributor License Agreement in the cla folder and submit it to [email protected].

Disclaimer

Apache Spark is a trademark of The Apache Software Foundation. The Apache Software Foundation is not affiliated, endorsed, connected, sponsored or otherwise associated in any way to Two Sigma, Flint, or this website in any manner.

ยฉ Two Sigma Open Source, LLC

flint's People

Contributors

aadamson avatar combinatorist avatar dzhang55 avatar hkothari avatar hungtantran avatar icexelloss avatar jaebahk avatar leifwalsh avatar mforsyth avatar paulgb avatar wenbozhao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flint's Issues

Private Schema and other private methods and constructors

Is there a reason why the convenience object Schema is private?

private[timeseries] object Schema

For instance:

// preferred but not working because Schema private
val tsRdd = TimeSeriesRDD.fromRDD(sc.parallelize(data, defaultNumPartitions), Schema("time" -> LongType, "id" -> IntegerType, "price" -> DoubleType))(isSorted = true, timeUnit = TimeUnit.NANOSECONDS)

val schema = StructType(
  StructField("time", LongType) ::
    StructField("id", IntegerType) ::
    StructField("price", DoubleType) :: Nil)
val tsRdd1 = TimeSeriesRDD.fromRDD(sc.parallelize(data, defaultNumPartitions), schema)(isSorted = true, timeUnit = TimeUnit.NANOSECONDS)

Also, some TimeSeriesRDD constructors are private, which may be useful:

private[timeseries] def fromSeq(
    sc: SparkContext,
    rows: Seq[InternalRow],
    schema: StructType,
    isSorted: Boolean,
    numSlices: Int = 1
  ): TimeSeriesRDD

  private[flint] def fromOrderedRDD(
    rdd: OrderedRDD[Long, Row],
    schema: StructType
  ): TimeSeriesRDD = {
    val converter = CatalystTypeConvertersWrapper.toCatalystRowConverter(schema)
    TimeSeriesRDD.fromInternalOrderedRDD(rdd.mapValues {
      case (_, row) => converter(row)
    }, schema)
  }

Also for testing access to the OrderedRdd is valuable, but that is also private

private[flint] def orderedRdd: OrderedRDD[Long, InternalRow]

This may open the implementation too much.

join not only by time but additionally also by column

How can I join not only by time but also by a column?

Currently, I get: Found duplicate columns, but I would like to perform the time series join per group.

val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("group", "time", "valueA")
  val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("group", "time", "valueB")
  val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
  val rightTS        = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)

  val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")

fails due to duplicate columns.

When renaming the columns:

val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("groupA", "time", "valueA")
  val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("groupB", "time", "valueB")
  val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
  val rightTS        = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)

  val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")
  mergedPerGroup.toDF.printSchema
  mergedPerGroup.toDF.show
+-------+------+------+------+------+
|   time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000|     1|   0.1|     3|    13|
|1000000|     3|   0.3|     3|    13|
|2000000|     1|   0.2|     3|    14|
|2000000|     3|   0.4|     3|    14|
+-------+------+------+------+------+

a cross join is performed between each group and time series.
that needs to be manually reduced.

mergedPerGroup.toDF.filter(col("groupA") === col("groupB")).show
+-------+------+------+------+------+
|   time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000|     3|   0.3|     3|    13|
|2000000|     3|   0.4|     3|    14|

Is there any functionality to perform this type of join more efficiently / built in?

usage of merge of a summarizer

I created an experimental summarizer to try to understand how it works: https://github.com/soloman817/flint/blob/feature/experiment/src/test/scala/com/twosigma/flint/timeseries/experiment/AccumulateSummarizerSpec.scala

There I print informations when the methods of a summarizer are called, such as add, merge, etc.

I found that, if I call TimeSeriesRDD.summarize(), then the subtract will not be called, and if I have multiple partitions, the merge will be called. which means, aggregation on all rows will run in parallel and eventually merged into final result.

But if I run the summarizer with TimeSeriesRDD.summarizeWindow(), then for the window aggregation, add and subtract will be called, but not merge. Which means, inside one window, it is not parallel.

Am I right? This knowlege will be very helpful for my implementation to our problem, which is an extension of that experimental code.

Thanks,
Xiang.

Python Clocks Not returning proper intervals.

@icexelloss

The clocks function for Flint in python is returning incorrect intervals.

The time intervals appear far too large than what I am specifying into the function.

For example:

from ts.flint import clocks
clock = clocks.uniform(sqlContext, frequency="1s", offset="0ns")
clock.show()

returns

time:timestamp
+-------------------+
|               time|
+-------------------+
|1970-01-01 00:00:00|
|1970-01-01 00:16:40|
|1970-01-01 00:33:20|
|1970-01-01 00:50:00|
|1970-01-01 01:06:40|
|1970-01-01 01:23:20|
|1970-01-01 01:40:00|
|1970-01-01 01:56:40|
|1970-01-01 02:13:20|
|1970-01-01 02:30:00|
|1970-01-01 02:46:40|
|1970-01-01 03:03:20|
|1970-01-01 03:20:00|
|1970-01-01 03:36:40|
|1970-01-01 03:53:20|
|1970-01-01 04:10:00|
|1970-01-01 04:26:40|
|1970-01-01 04:43:20|
|1970-01-01 05:00:00|
|1970-01-01 05:16:40|
+-------------------+
only showing top 20 rows

It should be 1 second intervals but returns intervals of 16 min 40 seconds.

Similarly, an interval of 1 day returns intervals of 2 years.

from ts.flint import clocks
clock = clocks.uniform(sqlContext, frequency="1d", offset="0ns", )
clock.show()
+-------------------+
|               time|
+-------------------+
|1970-01-01 00:00:00|
|1972-09-27 00:00:00|
|1975-06-24 00:00:00|
|1978-03-20 00:00:00|
|1980-12-14 00:00:00|
|1983-09-10 00:00:00|
|1986-06-06 00:00:00|
|1989-03-02 00:00:00|
|1991-11-27 00:00:00|
|1994-08-23 00:00:00|
|1997-05-19 00:00:00|
|2000-02-13 00:00:00|
|2002-11-09 00:00:00|
|2005-08-05 00:00:00|
|2008-05-01 00:00:00|
|2011-01-26 00:00:00|
|2013-10-22 00:00:00|
|2016-07-18 00:00:00|
|2019-04-14 00:00:00|
|2022-01-08 00:00:00|
+-------------------+
only showing top 20 rows

Also when I supply custom start and end times, the years returned are way out of range.

from ts.flint import clocks
clock = clocks.uniform(sqlContext, frequency="1d", offset="0ns", begin_date_time="2014-04-23", end_date_time="2015-04-23")
clock.show()
time:timestamp
+--------------------+
|                time|
+--------------------+
|46277-07-20 00:00...|
|46280-04-15 00:00...|
|46283-01-10 00:00...|
|46285-10-06 00:00...|
|46288-07-02 00:00...|
|46291-03-29 00:00...|
|46293-12-23 00:00...|
|46296-09-18 00:00...|
|46299-06-15 00:00...|
|46302-03-12 00:00...|
|46304-12-06 00:00...|
|46307-09-02 00:00...|
|46310-05-29 00:00...|
|46313-02-22 00:00...|
|46315-11-19 00:00...|
|46318-08-15 00:00...|
|46321-05-11 00:00...|
|46324-02-05 00:00...|
|46326-11-01 00:00...|
|46329-07-28 00:00...|
+--------------------+
only showing top 20 rows

How to get the last row of a TimeSeriesRDD?

I have a time series RDD object, and I know internally it is sorted by the timestamps. What is the efficient way to get the start time and last time? There is a TimeSeriesRDD.first which returns the first row, so I can get the start time. But how to get the last row efficiently?

Method not found when creating time series add

val tsRdd = TimeSeriesRDD.fromDF(dataFrame = df)(isSorted = true, timeUnit = MILLISECONDS)

throws

scala> val tsRdd = TimeSeriesRDD.fromDF(dataFrame = cellFeed)(isSorted = true, timeUnit = MILLISECONDS)
java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution$.apply$default$2()Lscala/Option;
  at com.twosigma.flint.timeseries.TimeSeriesStore$.isClustered(TimeSeriesStore.scala:149)
  at com.twosigma.flint.timeseries.TimeSeriesStore$.apply(TimeSeriesStore.scala:64)
  at com.twosigma.flint.timeseries.TimeSeriesRDD$.fromDFWithPartInfo(TimeSeriesRDD.scala:509)
  at com.twosigma.flint.timeseries.TimeSeriesRDD$.fromDF(TimeSeriesRDD.scala:304)
  ... 52 elided

on spark 2.2 when trying to create the initial RDD.

Minimal reproducible sample:

import spark.implicits._
  import com.twosigma.flint.timeseries.TimeSeriesRDD
  import scala.concurrent.duration._
  val df = Seq((1, 1, 1L), (2, 3, 1L), (1, 4, 2L), (2, 2, 2L)).toDF("id", "value", "time")
  val tsRdd = TimeSeriesRDD.fromDF(dataFrame = df)(isSorted = true, timeUnit = MILLISECONDS)

on spark 2.2 via HDP 2.6.4

sbt assemblyNoTest give Null Pointer Exception

Ubuntu 18.04 with Java 8. Could you please help?
I get NPE when I run:
git clone https://github.com/twosigma/flint.git
cd flint
sbt assemblyNoTest

stacktrace is:

(base) jcarlson@smaaetlinuxsrv04:~/ts_flint/flint$ sbt assemblyNoTest
Picked up _JAVA_OPTIONS: -Xmx64g
Picked up _JAVA_OPTIONS: -Xmx64g
[info] Loading project definition from /home/jcarlson/ts_flint/flint/project
[info] Updating {file:/home/jcarlson/ts_flint/flint/project/}flint-build...
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by sbt.ivyint.ErrorMessageAuthenticator$ (file:/home/jcarlson/.sbt/boot/scala-2.10.6/org.scala-sbt/sbt/0.13.11/ivy-0.13.11.jar) to field java.net.Authenticator.theAuthenticator
WARNING: Please consider reporting this to the maintainers of sbt.ivyint.ErrorMessageAuthenticator$
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
java.lang.NullPointerException
at java.base/java.util.regex.Matcher.getTextLength(Matcher.java:1770)
at java.base/java.util.regex.Matcher.reset(Matcher.java:416)
at java.base/java.util.regex.Matcher.(Matcher.java:253)
at java.base/java.util.regex.Pattern.matcher(Pattern.java:1133)
at java.base/java.util.regex.Pattern.split(Pattern.java:1261)
at java.base/java.util.regex.Pattern.split(Pattern.java:1334)
at sbt.IO$.pathSplit(IO.scala:797)
at sbt.IO$.parseClasspath(IO.scala:912)
at sbt.compiler.CompilerArguments.extClasspath(CompilerArguments.scala:66)
at sbt.compiler.MixedAnalyzingCompiler$.withBootclasspath(MixedAnalyzingCompiler.scala:188)
at sbt.compiler.MixedAnalyzingCompiler$.searchClasspathAndLookup(MixedAnalyzingCompiler.scala:166)
at sbt.compiler.MixedAnalyzingCompiler$.apply(MixedAnalyzingCompiler.scala:176)
at sbt.compiler.IC$.incrementalCompile(IncrementalCompiler.scala:138)
at sbt.Compiler$.compile(Compiler.scala:152)
at sbt.Compiler$.compile(Compiler.scala:138)
at sbt.Defaults$.sbt$Defaults$$compileIncrementalTaskImpl(Defaults.scala:860)
at sbt.Defaults$$anonfun$compileIncrementalTask$1.apply(Defaults.scala:851)
at sbt.Defaults$$anonfun$compileIncrementalTask$1.apply(Defaults.scala:849)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
at sbt.std.Transform$$anon$4.work(System.scala:63)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.Execute.work(Execute.scala:237)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[error] (compile:compileIncremental) java.lang.NullPointerException
Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore? q

Spark 2.3

Are there any plans to update Flint to run on Spark 2.3? I would love to be able to use pandas_udf's on my Time Series DataFrames.

Thanks!

[Python] Remove TimeUnit and TimeColumn TimeSeriesDataFrame

Currently TimeSeriesDataFrame stores a time unit and a time column name internally. This is not consistent with the Scala implementation of TimeSeriesRDD and causes extra copying and complexity. We should just force the timeUnit and timeColumn to be nanoseconds and 'time'

bug with com.fasterxml

Hi,

I've downloaded and built flint with sbt.

I've then copy and paste the .jar from the target folder into spark/jars

But this create a conflit with com.fasterxml when I try your first example code (CSV)

The error : com.fasterxml.jackson.databind.JsonMappingException: Jackson version is too old 2.3.2

Do you know why ?

Thks,

Compilation error

Running sbt assembly compilation fails due to missing slf, it was resolved by adding :

Adding,
"org.slf4j" % "slf4j-simple" % "1.6.4"
to libraryDependencies allowed it to compile.

Though i had no problems when i ran make dist.

[Python] dataframe summarize and summarizeCycles possible malfunction

I have a dataset which time index is daily, and I want to sum all the values for the same day (same index). I can do this function using the summarize method, but it doesn't work specifying the key 'time' (my flint time index).

Using:
flintdf.summarize(summarizers.sum('values'), key='time')
or
flintdf.summarizeCycles(summarizers.sum('values'), key='time')

gives me this error:

Py4JJavaError: An error occurred while calling o449.summarize.
: com.twosigma.flint.timeseries.row.DuplicateColumnsException: Found duplicate columns List(time) in schema...

The point is that I also tried using summarizeCycles without a key, but it sum the time too and gives me the total sum of absolutely everything:

flintdf.summarizeCycles(summarizers.count())

returns me something like this:

|-------------time------------|count|
|9223372036854775807| 3030|

I think this could be another possible malfunction, because there are many different timestamps in my dataset and as the documentation says,

"Computes aggregate statistics of rows that share a timestamp." .

And the last thing I tried, was to use another date field, which permits me to use the summarize and summarizeCycles with date as key, but it looks like the summarize method deletes the timeindex, making all the values 0 and the resulting dataframe with the values unsorted.
Using summarizeCycles with key, returns the same dataframe, taking only the first element with a timestamp, the repeated index rows are deleted and as index, it uses the same for every value, that is the sum of all the times, in my case 9223372036854775807

Python version: 3.5

compatibility with sparklyr?

hello and thanks for this very interesting project. I wonder if someone can use it with sparklyr? any ideas?

thanks!

Extending Window capabilities

I would like to extend the window capabilities and like to discuss how to best implement these. Considering existing functionality we can do:

val result = priceTSRdd.addWindows(Window.pastAbsoluteTime("1000ns"))
// time  price window_past_1000ns
// ------------------------------------------------------
// 1000L 1.0   [[1000L, 1.0]]
// 1500L 2.0   [[1000L, 1.0], [1500L, 2.0]]
// 2000L 3.0   [[1000L, 1.0], [1500L, 2.0], [2000L, 3.0]]
// 2500L 4.0   [[1500L, 2.0], [2000L, 3.0], [2500L, 4.0]]
  1. Window at predefined time stamps only
    This creates a window at each row backward. For very "dense" time series with samples at nano scale we might not do the window at each observation but run some statistics or other discovery method to find those points at which we want to create a window.

  2. Windows of varying length
    In the trading world we can imagine windows of varying time length, e.g. determined by a "volume clock"

  3. Windows of fixed number of observations. I saw a count window but not clear to me how to use it.

  4. Two segment windows.
    A first segment (section) of a window could be used to calculate some online statistics which are then consumed by a summary function which is applied over the second part of the window (adaptive summary stats, e.g. consider thresholds based on an online volatility estimator).

How would these more general features best implemented? Any good advise how to add these extensions? Happy to contribute as well.

"joinLeft" is not working when joining a clock with a parque-based DataFrame

@icexelloss

(even though the "timeColumn" argument error can be bypassed by renaming the column in question to time) the joinLeft is not working for me:

print( sc.version )
print( tm )

n = df.filter( df['Container'] == 'dbc94d4e3af6' ).select( tm, 'MemPercentG', 'CpuPercentG' )

from ts.flint import FlintContext, clocks
from ts.flint import utils
  
fc = FlintContext( sqlContext )

r = fc.read \
    .option('isSorted', False) \
    .option('timeUnit', 's') \
    .dataframe( n )

r.show(truncate=False)
print( r )
r.printSchema()

l = clocks.uniform(fc, '30s', begin_date_time='2018-8-1 5:55:35', end_date_time='2018-08-01 05:59:05')
print( type( l ) )
print( l )
l.printSchema()
# l.show(truncate=False)
j = l.leftJoin( r )

With the output being:

2.3.1
time
+-------------------+---------------+------------+
|time               |MemPercentG    |CpuPercentG |
+-------------------+---------------+------------+
|2018-08-01 05:55:35|0.0030517578125|0.002331024 |
|2018-08-01 05:58:05|0.0030517578125|0.0031538776|
|2018-08-01 05:59:05|0.0030517578125|0.0030176123|
+-------------------+---------------+------------+

TimeSeriesDataFrame[time: timestamp, MemPercentG: double, CpuPercentG: float]
root
 |-- time: timestamp (nullable = true)
 |-- MemPercentG: double (nullable = true)
 |-- CpuPercentG: float (nullable = true)

<class 'ts.flint.dataframe.TimeSeriesDataFrame'>
TimeSeriesDataFrame[time: timestamp]
root
 |-- time: timestamp (nullable = true)

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.codegen.ExprCode.value()Ljava/lang/String;
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-911439891027714> in <module>()
     22 l.printSchema()
     23 # l.show(truncate=False)
---> 24 j = l.leftJoin( r )
     25 
     26 

/databricks/python/lib/python3.5/site-packages/ts/flint/dataframe.py in leftJoin(self, right, tolerance, key, left_alias, right_alias)
    606         tolerance = self._timedelta_ns('tolerance', tolerance, default='0ns')
    607         scala_key = utils.list_to_seq(self._sc, key)
--> 608         tsrdd = self.timeSeriesRDD.leftJoin(right.timeSeriesRDD, tolerance, scala_key, left_alias, right_alias)
    609         return TimeSeriesDataFrame._from_tsrdd(tsrdd, self.sql_ctx)
    610 

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3324.leftJoin.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.codegen.ExprCode.value()Ljava/lang/String;
	at org.apache.spark.sql.TimestampCast$class.doGenCode(TimestampCast.scala:77)
	at org.apache.spark.sql.NanosToTimestamp.doGenCode(TimestampCast.scala:31)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:111)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:108)
	at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:143)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24$$anonfun$apply$5.apply(CodeGenerator.scala:1367)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24$$anonfun$apply$5.apply(CodeGenerator.scala:1366)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24.apply(CodeGenerator.scala:1366)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24.apply(CodeGenerator.scala:1366)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.withSubExprEliminationExprs(CodeGenerator.scala:1227)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressionsForWholeStageWithCSE(CodeGenerator.scala:1365)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:67)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
	at org.apache.spark.sql.execution.RDDScanExec.consume(ExistingRDD.scala:176)
	at org.apache.spark.sql.execution.RDDScanExec.doProduce(ExistingRDD.scala:221)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.RDDScanExec.produce(ExistingRDD.scala:176)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:50)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:40)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:530)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:582)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:150)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:108)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:108)
	at com.twosigma.flint.timeseries.NormalizedDataFrameStore.toOrderedRdd(TimeSeriesStore.scala:252)
	at com.twosigma.flint.timeseries.NormalizedDataFrameStore.orderedRdd(TimeSeriesStore.scala:237)
	at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.orderedRdd(TimeSeriesRDD.scala:1346)
	at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.leftJoin(TimeSeriesRDD.scala:1515)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)

By the way, what is the way to even display the contents of the clocks DataFrame?
The second to last commented out line (with the .show command) errors out, so I don't understand how TimeSeriesDataFrame is inheriting from a regular DataFrame, for which that method is available...
The display method also fails...

Anyways, what is wrong with the leftJoin here? The clock is on the left like you indicated it should be. Swapping left and right data frames also does not help.

Is this reproducible for you?

Please, advise, if I'm not using/calling it correctly or if it's a bug.

The flint libraries (the Scala and the Python ones) I installed on DataBricks via its UI (from the respective online repos, which might be dated) - I can try and install the latest builds from the freshest source code, if you think that will help.

Thanks.

While building this sbt project, got below error

`[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: org.slf4j#slf4j-api;1.7.7: several problems occurred while resolving dependency: org.slf4j#slf4j-api;1.7.7 {compile=[compile(), master(compile)], runtime=[runtime()]}:
[warn] several problems occurred while resolving dependency: org.slf4j#slf4j-parent;1.7.7 {}:
[warn] URI has an authority component
[warn] URI has an authority component
[warn]
[warn] URI has an authority component
[warn] :: org.apache.spark#spark-core_2.11;2.1.0: several problems occurred while resolving dependency: org.apache.spark#spark-core_2.11;2.1.0 {provided=[default(compile)]}:
[warn] several problems occurred while resolving dependency: org.apache.spark#spark-parent_2.11;2.1.0 {}:
[warn] URI has an authority component
[warn] URI has an authority component
[warn]
[warn] URI has an authority component
[warn] :: org.apache.spark#spark-mllib_2.11;2.1.0: several problems occurred while resolving dependency: org.apache.spark#spark-mllib_2.11;2.1.0 {provided=[default(compile)]}:
[warn] several problems occurred while resolving dependency: org.apache.spark#spark-parent_2.11;2.1.0 {}:
[warn] URI has an authority component
[warn] URI has an authority component
[warn]
[warn] URI has an authority component
[warn] :: org.apache.spark#spark-sql_2.11;2.1.0: several problems occurred while resolving dependency: org.apache.spark#spark-sql_2.11;2.1.0 {provided=[default(compile)]}:
[warn] several problems occurred while resolving dependency: org.apache.spark#spark-parent_2.11;2.1.0 {}:
[warn] URI has an authority component
[warn] URI has an authority component
[warn]
[warn] URI has an authority component
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] org.slf4j:slf4j-api:1.7.7
[warn] +- org.clapper:grizzled-slf4j_2.11:1.3.0 (D:\flint-master\build.sbt#L98)
[warn] +- com.twosigma:flint_2.11:0.2.0-SNAPSHOT
[warn] org.apache.spark:spark-core_2.11:2.1.0 (D:\flint-master\build.sbt#L98)
[warn] +- com.twosigma:flint_2.11:0.2.0-SNAPSHOT
[warn] org.apache.spark:spark-mllib_2.11:2.1.0 (D:\flint-master\build.sbt#L98)
[warn] +- com.twosigma:flint_2.11:0.2.0-SNAPSHOT
[warn] org.apache.spark:spark-sql_2.11:2.1.0 (D:\flint-master\build.sbt#L98)
[warn] +- com.twosigma:flint_2.11:0.2.0-SNAPSHOT
sbt.ResolveException: unresolved dependency: org.slf4j#slf4j-api;1.7.7: several problems occurred while resolving dependency: org.slf4j#slf4j-api;1.7.7 {compile=[compile(), master(compile)], runtime=[runtime()]}:
several problems occurred while resolving dependency: org.slf4j#slf4j-parent;1.7.7 {}:
URI has an authority component
URI has an authority component

    URI has an authority component

unresolved dependency: org.apache.spark#spark-core_2.11;2.1.0: several problems occurred while resolving dependency: org.apache.spark#spark-core_2.11;2.1.0 {provided=[default(compile)]}:
several problems occurred while resolving dependency: org.apache.spark#spark-parent_2.11;2.1.0 {}:
URI has an authority component
URI has an authority component

    URI has an authority component

unresolved dependency: org.apache.spark#spark-mllib_2.11;2.1.0: several problems occurred while resolving dependency: org.apache.spark#spark-mllib_2.11;2.1.0 {provided=[default(compile)]}:
several problems occurred while resolving dependency: org.apache.spark#spark-parent_2.11;2.1.0 {}:
URI has an authority component
URI has an authority component

    URI has an authority component

unresolved dependency: org.apache.spark#spark-sql_2.11;2.1.0: several problems occurred while resolving dependency: org.apache.spark#spark-sql_2.11;2.1.0 {provided=[default(compile)]}:
several problems occurred while resolving dependency: org.apache.spark#spark-parent_2.11;2.1.0 {}:
URI has an authority component
URI has an authority component

    URI has an authority component

    at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:313)
    at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:191)
    at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:168)
    at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
    at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
    at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:133)
    at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:57)
    at sbt.IvySbt$$anon$4.call(Ivy.scala:65)
    at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:95)
    at xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:80)
    at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:99)
    at xsbt.boot.Using$.withResource(Using.scala:10)
    at xsbt.boot.Using$.apply(Using.scala:9)
    at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:60)
    at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:50)
    at xsbt.boot.Locks$.apply0(Locks.scala:31)
    at xsbt.boot.Locks$.apply(Locks.scala:28)
    at sbt.IvySbt.withDefaultLogger(Ivy.scala:65)
    at sbt.IvySbt.withIvy(Ivy.scala:128)
    at sbt.IvySbt.withIvy(Ivy.scala:125)
    at sbt.IvySbt$Module.withModule(Ivy.scala:156)
    at sbt.IvyActions$.updateEither(IvyActions.scala:168)
    at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1439)
    at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1435)
    at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$90.apply(Defaults.scala:1470)
    at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$90.apply(Defaults.scala:1468)
    at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:37)
    at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1473)
    at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1467)
    at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:60)
    at sbt.Classpaths$.cachedUpdate(Defaults.scala:1490)
    at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1417)
    at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1369)
    at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
    at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
    at sbt.std.Transform$$anon$4.work(System.scala:63)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
    at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
    at sbt.Execute.work(Execute.scala:237)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
    at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
    at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

[error] (:update) sbt.ResolveException: unresolved dependency: org.slf4j#slf4j-api;1.7.7: several problems occurred while resolving dependency: org.slf4j#slf4j-api;1.7.7 {compile=[compile(), master(compile)], runtime=[runtime(*)]}:
[error] several problems occurred while resolving dependency: org.slf4j#slf4j-parent;1.7.7 {}:
[error] URI has an authority component
[error] URI has an authority component
[error]
[error] URI has an authority component
[error]
[error] unresolved dependency: org.apache.spark#spark-core_2.11;2.1.0: several problems occurred while resolving dependency: org.apache.spark#spark-core_2.11;2.1.0 {provided=[default(compile)]}:
[error] several problems occurred while resolving dependency: org.apache.spark#spark-parent_2.11;2.1.0 {}:
[error] URI has an authority component
[error] URI has an authority component
[error]
[error] URI has an authority component
[error]
[error] unresolved dependency: org.apache.spark#spark-mllib_2.11;2.1.0: several problems occurred while resolving dependency: org.apache.spark#spark-mllib_2.11;2.1.0 {provided=[default(compile)]}:
[error] several problems occurred while resolving dependency: org.apache.spark#spark-parent_2.11;2.1.0 {}:
[error] URI has an authority component
[error] URI has an authority component
[error]
[error] URI has an authority component
[error]
[error] unresolved dependency: org.apache.spark#spark-sql_2.11;2.1.0: several problems occurred while resolving dependency: org.apache.spark#spark-sql_2.11;2.1.0 {provided=[default(compile)]}:
[error] several problems occurred while resolving dependency: org.apache.spark#spark-parent_2.11;2.1.0 {}:
[error] URI has an authority component
[error] URI has an authority component
[error]
[error] URI has an authority component
[error] Total time: 32 s, completed Feb 9, 2018 3:53:50 PM`

Can you help me to resolve this issue while building the sbt project

Design question: Can I use LeftSubtractableSummarizer for snapshot reconstruction

Just dive into the source code of flint, looks very good. Now I have a new requirement and I'm trying to create a custom summarizer. It would be better if you guys can give me some feedback on the usage.

So, say we have a time series of events, some events are snapshot, but most events are delta, which will be applied to previous snapshot. The snapshot might be very large, say, the whole order book levels. This is why we cannot store the snapshot (the actual state) into the time series data.

So I started a simplified version. Say the snapshot is just a Long, and the delta is also Long. Like this:

+----+--------+-----+
|time|snapshot|delta|
+----+--------+-----+
|1000|    null|    1|
|1010|    null|    1|
|1050|       8| null|
|1100|    null|    1|
|1200|    null|    1|
|1250|    null|    1|
|1350|      11| null|
|1550|    null|    1|
+----+--------+-----+

From there, I want to create a summarizer that can reconstruct the snapshots, then grab some information of the snapshot at each event. Say, I want to generate the final data frame like:

+----+--------+-----+----------------------+
|time|snapshot|delta|reconstructed_snapshot|
+----+--------+-----+----------------------+
|1000|    null|    1|                  null|
|1010|    null|    1|                  null|
|1050|       8| null|                     8|
|1100|    null|    1|                     9|
|1200|    null|    1|                    10|
|1250|    null|    1|                    11|
|1350|      11| null|                    11|
|1550|    null|    1|                    12|
+----+--------+-----+----------------------+

The first two rows are still null because there is no start snapshot. But after the first snapshot, we should be able to reconstruct the snapshot for each event.

Now, I want to extend a custom summarizer, and I saw most of the summarizers are extended from LeftSubtractableSummarizer. I also found there is a property test for LeftSubtractableProperty, there it wrote: "Check if (a + b) + c - a = b + c"

In logic, the snapshot reconstruction is left subtractable. But in reality, for a window, we must know the snapshot of the first event in that window, otherwise it cannot construct. So for this reason, I start to define my summarizer as: case class Experiment1Summarizer() extends LeftSubtractableSummarizer[Option[Long], Option[Long], Option[Long]]

The three types, T, U and V are all Option[Long] because of this reason. So it will not pass the LeftSubtractableProperty obviously. For example:

a = |1050|       8| null|
b = |1100|    null|    1|
c = |1200|    null|    1|
a + b + c = Some(10)  a is snapshot
a + b + c - a = Some(10), because remove an old event will not effect the state
but, b + c = None, because there is no start snapshot.

So, does this make sense to use LeftSubtractableSummarizer to implement?

Regards,
Xiang.

[Python] Enable python testing

We are able to test python binding using a library we have internally to create sparkContext. We need a way to run flint test with pyspark in OSS.

Using flint with pyspark on yarn

Hi,
I'm trying to use flint submitting a pyspark job on yarn.

>> ./bin/pyspark --master yarn --deploy-mode client --jars /opt/flint-assembly-0.2.0-SNAPSHOT.jar --py-files /opt/flint-assembly-0.2.0-SNAPSHOT.jar`
[..]
SparkSession available as 'spark'.
>>> import ts.flint
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ModuleNotFoundError: No module named 'ts'
>>> import sys
>>> sys.path
['', '/tmp/spark-1d453f8f-379a-4f22-a7e4-cabe9dad15c5/userFiles-0b6ed883-15b1-4893-acb8-977f74b09913/flint-assembly-0.2.0-SNAPSHOT.jar', '/tmp/spark-1d453f8f-379a-4f22-a7e4-cabe9dad15c5/userFiles-0b6ed883-15b1-4893-acb8-977f74b09913', '/usr/hdp/2.6.1.0-129/spark2/python/lib/py4j-0.10.4-src.zip', '/usr/hdp/2.6.1.0-129/spark2/python', '/usr/hdp/2.6.1.0-129/spark2', '/opt/miniconda3/envs/lhd_spark/lib/python36.zip', '/opt/miniconda3/envs/lhd_spark/lib/python3.6', '/opt/miniconda3/envs/lhd_spark/lib/python3.6/lib-dynload', '/opt/miniconda3/envs/lhd_spark/lib/python3.6/site-packages', '/opt/miniconda3/envs/lhd_spark/lib/python3.6/site-packages/setuptools-27.2.0-py3.6.egg']

Using same approach on master local works properly while on yarn seems to refer to invalid path and the import fails.

I was able to use the library following this similar topic extracting python code from the jar and copying it in my working directory.

Is that ok or there is a better way to proceed?

NoSuchMethodError: internalCreateDataFrame

Thank you for this amazing library! ๐Ÿฅ‡
I'm running Spark 2.2.0 and tried to initialize a clock:

clock = clocks.uniform(sqlContext, frequency="1day")

This threw an exception:

py4j.protocol.Py4JJavaError: An error occurred while calling z:com.twosigma.flint.timeseries.Clocks.uniform.
: java.lang.NoSuchMethodError: org.apache.spark.sql.SparkSession.internalCreateDataFrame$default$3()Z
	at org.apache.spark.sql.DFConverter$.toDataFrame(DFConverter.scala:42)
	at com.twosigma.flint.timeseries.clock.Clock.asTimeSeriesRDD(Clock.scala:148)
	at com.twosigma.flint.timeseries.Clocks$.uniform(Clocks.scala:54)
	at com.twosigma.flint.timeseries.Clocks.uniform(Clocks.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

NoSuchMethodError: org.apache.spark.sql.SparkSession.internalCreateDataFrame$default$3()Z

I found this method in the docs: https://spark.apache.org/docs/preview/api/java/org/apache/spark/sql/SparkSession.html#internalCreateDataFrame(org.apache.spark.rdd.RDD,%20org.apache.spark.sql.types.StructType)

However it seems like it's not available in the version I'm running? Can you please provide me with a hint on how to resolve this issue? Thanks! ๐Ÿ‘

Narrow dependency

I saw this code in overlapped RDD implementation: https://github.com/twosigma/flint/blob/master/src/main/scala/com/twosigma/flint/rdd/OverlappedOrderedRDD.scala#L50

Then I read about what does narrow dependency mean: https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies , there it says: "Narrow dependencies: Each partition of the parent RDD is used by at most one partition of the child RDD."

But in flint implementation, the overlappedRDD is the child RDD and the orderRDD is its parent. But one partition in the parent will be used in multiple partitions in the child overlappedRDD, because of the overlapping.

So, what am I missing to understand this?

Thanks in advance, Xiang

TypeError: 'JavaPackage' object is not callable

Whenever I try to use Flint here locally (no Hadoop/EMR involved), it keep barfing at me with the above error message in the subject. It's a setup on top of Python 3.7 with PySpark 2.4.4 and OpenJDK 8; an Ubuntu 19.04 install.

Note: As I'm running locally only, I'm getting this log message from Spark, but everything does run perfectly using vanilla PySpark:

19/10/23 09:59:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

It happens when I try to either read a PySpark dataframe into a ts.flint.TimeSeriesDataFrame. This example is adapted from the Flint Example.ipynb:

import pyspark
import ts.flint
from ts.flint import FlintContext

sc = pyspark.SparkContext('local', 'Flint Example')
spark = pyspark.sql.SparkSession(sc)
flint_context = FlintContext(spark)

sp500 = (
    spark.read
    .option('header', True)
    .option('inferSchema', True)
    .csv('sp500.csv')
    .withColumnRenamed('Date', 'time')
)
sp500 = flint_context.read.dataframe(sp500)

The last line causes the "boom", with this (first part of) the stack trace:

TypeError                                 Traceback (most recent call last)
~/.virtualenvs/pyspark-test/lib/python3.7/site-packages/ts/flint/java.py in new_reader(self)
     37         try:
---> 38             return utils.jvm(self.sc).com.twosigma.flint.timeseries.io.read.TSReadBuilder()
     39         except TypeError:

TypeError: 'JavaPackage' object is not callable

Any ideas what may be going wrong and how the problem could be solved?

Regression/summarizer performance collapse?

Hello, I am using many regressions in parallel over a single call to summarize. I've noticed that if I run ~20 regressions on a dataset with 5M rows, it seems to take 45-60 minutes to summarize. If I run a single regression on a similarly-sized dataset, however, it only takes a minute or two to summarize. What kinds of performance characteristics should I expect, and how can I avoid this kind of performance collapse?

Thank you!

After summarizeWindows flint_df can't able to convert into spark df

I can able to convert spark df to flint df and apply the summarizeWindows function also working.
After applying any flint function , I can't able to access the df (like i can't convert back to spark df, or can't able to save the result file.

Even flint.show() or spark _df.show() getting error. But function is working correctly, i cant able to access the result df.

Test failure in ClockSpec tests

When running sbt assembly, I get an error that looks like it's picking up an ambient timezone somewhere in my environment:

[info] UniformClock
[info] - should generate clock ticks correctly (4 milliseconds)
[info] - should generate clock ticks in RDD correctly (90 milliseconds)
[info] - should generate clock ticks in TimeSeriesRDD correctly (164 milliseconds)
[info] - should generate clock ticks with offset in TimeSeriesRDD correctly (74 milliseconds)
[info] - should generate clock ticks with offset & time zone in TimeSeriesRDD correctly (75 milliseconds)
[info] - should generate clock ticks with default in TimeSeriesRDD correctly (172 milliseconds)
[info] - should generate timestamp correctly *** FAILED *** (160 milliseconds)
[info]   1989-12-31 18:00:00.0 did not equal 1990-01-01 00:00:00.0 (ClockSpec.scala:85)

Is this a known issue, or a known problem in my setup maybe?

error in installation while running sbt assemblyNoTest

I am getting a different error from issue #24 .

Can you please let me know where am I going wrong? Following is the error.

~/flint-master$ sbt assemblyNoTest
[ERROR] Failed to construct terminal; falling back to unsupported
java.lang.NumberFormatException: For input string: "0x100"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.valueOf(Integer.java:983)
at jline.internal.InfoCmp.parseInfoCmp(InfoCmp.java:59)
at jline.UnixTerminal.parseInfoCmp(UnixTerminal.java:233)
at jline.UnixTerminal.(UnixTerminal.java:64)
at jline.UnixTerminal.(UnixTerminal.java:49)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:488)
at java.base/java.lang.Class.newInstance(Class.java:560)
at jline.TerminalFactory.getFlavor(TerminalFactory.java:209)
at jline.TerminalFactory.create(TerminalFactory.java:100)
at jline.TerminalFactory.get(TerminalFactory.java:184)
at jline.TerminalFactory.get(TerminalFactory.java:190)
at sbt.ConsoleLogger$.ansiSupported(ConsoleLogger.scala:123)
at sbt.ConsoleLogger$.(ConsoleLogger.scala:117)
at sbt.ConsoleLogger$.(ConsoleLogger.scala)
at sbt.GlobalLogging$.initial(GlobalLogging.scala:43)
at sbt.StandardMain$.initialGlobalLogging(Main.scala:61)
at sbt.StandardMain$.initialState(Main.scala:70)
at sbt.xMain.run(Main.scala:29)
at xsbt.boot.Launch$$anonfun$run$1.apply(Launch.scala:109)
at xsbt.boot.Launch$.withContextLoader(Launch.scala:128)
at xsbt.boot.Launch$.run(Launch.scala:109)
at xsbt.boot.Launch$$anonfun$apply$1.apply(Launch.scala:35)
at xsbt.boot.Launch$.launch(Launch.scala:117)
at xsbt.boot.Launch$.apply(Launch.scala:18)
at xsbt.boot.Boot$.runImpl(Boot.scala:56)
at xsbt.boot.Boot$.main(Boot.scala:18)
at xsbt.boot.Boot.main(Boot.scala)

Getting error when running 'make dist'

Hi,

I am trying to install flint on my mac but when I run 'make dist', I get this error:

Himanshus-MacBook-Pro:flint-master himanshugupta$ make dist
find . -name '.pyc' -exec rm -f {} +
find . -name '
.pyo' -exec rm -f {} +
find . -name '*~' -exec rm -f {} +
find . -name 'pycache' -exec rm -fr {} +
sbt "set test in assembly := {}" clean assembly
[info] Loading project definition from /Users/himanshugupta/spark/flint-master/project
java.lang.NullPointerException
at java.base/java.util.regex.Matcher.getTextLength(Matcher.java:1769)
at java.base/java.util.regex.Matcher.reset(Matcher.java:416)
at java.base/java.util.regex.Matcher.(Matcher.java:253)
at java.base/java.util.regex.Pattern.matcher(Pattern.java:1147)
at java.base/java.util.regex.Pattern.split(Pattern.java:1264)
at java.base/java.util.regex.Pattern.split(Pattern.java:1335)
at sbt.IO$.pathSplit(IO.scala:797)
at sbt.IO$.parseClasspath(IO.scala:912)
at sbt.compiler.CompilerArguments.extClasspath(CompilerArguments.scala:66)
at sbt.compiler.MixedAnalyzingCompiler$.withBootclasspath(MixedAnalyzingCompiler.scala:188)
at sbt.compiler.MixedAnalyzingCompiler$.searchClasspathAndLookup(MixedAnalyzingCompiler.scala:166)
at sbt.compiler.MixedAnalyzingCompiler$.apply(MixedAnalyzingCompiler.scala:176)
at sbt.compiler.IC$.incrementalCompile(IncrementalCompiler.scala:138)
at sbt.Compiler$.compile(Compiler.scala:152)
at sbt.Compiler$.compile(Compiler.scala:138)
at sbt.Defaults$.sbt$Defaults$$compileIncrementalTaskImpl(Defaults.scala:860)
at sbt.Defaults$$anonfun$compileIncrementalTask$1.apply(Defaults.scala:851)
at sbt.Defaults$$anonfun$compileIncrementalTask$1.apply(Defaults.scala:849)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
at sbt.std.Transform$$anon$4.work(System.scala:63)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.Execute.work(Execute.scala:237)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.base/java.lang.Thread.run(Thread.java:844)
[error] (compile:compileIncremental) java.lang.NullPointerException
Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore? l
[info] Loading project definition from /Users/himanshugupta/spark/flint-master/project
[debug] Forcing garbage collection...
java.lang.NullPointerException
at java.base/java.util.regex.Matcher.getTextLength(Matcher.java:1769)
at java.base/java.util.regex.Matcher.reset(Matcher.java:416)
at java.base/java.util.regex.Matcher.(Matcher.java:253)
at java.base/java.util.regex.Pattern.matcher(Pattern.java:1147)
at java.base/java.util.regex.Pattern.split(Pattern.java:1264)
at java.base/java.util.regex.Pattern.split(Pattern.java:1335)
at sbt.IO$.pathSplit(IO.scala:797)
at sbt.IO$.parseClasspath(IO.scala:912)
at sbt.compiler.CompilerArguments.extClasspath(CompilerArguments.scala:66)
at sbt.compiler.MixedAnalyzingCompiler$.withBootclasspath(MixedAnalyzingCompiler.scala:188)
at sbt.compiler.MixedAnalyzingCompiler$.searchClasspathAndLookup(MixedAnalyzingCompiler.scala:166)
at sbt.compiler.MixedAnalyzingCompiler$.apply(MixedAnalyzingCompiler.scala:176)
at sbt.compiler.IC$.incrementalCompile(IncrementalCompiler.scala:138)
at sbt.Compiler$.compile(Compiler.scala:152)
at sbt.Compiler$.compile(Compiler.scala:138)
at sbt.Defaults$.sbt$Defaults$$compileIncrementalTaskImpl(Defaults.scala:860)
at sbt.Defaults$$anonfun$compileIncrementalTask$1.apply(Defaults.scala:851)
at sbt.Defaults$$anonfun$compileIncrementalTask$1.apply(Defaults.scala:849)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
at sbt.std.Transform$$anon$4.work(System.scala:63)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.Execute.work(Execute.scala:237)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.base/java.lang.Thread.run(Thread.java:844)
[error] (compile:compileIncremental) java.lang.NullPointerException
[debug] > load-failed
[debug] > last

Any idea what's causing this and how I can fix it?

Thanks

Time Series Clustering

Hello, I'm working on Time Series Clustering. Is there any module in ts-flint for time series clustering?

Expose close summarizer to Python API

I have been using the Summarizers (mean, count, max, min) from the Python API in the last couple of weeks. I use them jointly with a clock, which gives me uniform timestamps of one minute bars. Now I was wondering if it is possible to have a summerizer in Python which gives me the close value (last value of the interval). I am dealing with price data and I saw that the Scala implementation of the summarizers have the "close" functionality. Is it possible to expose this summarizer into the Python API?

Thank you very much for your support

"joinLeft" returns dataframe that smaller than left_df

Hi all:

I found that leftJoin generates df that smaller than the left df

[In] [1]:  joined_flint = left_flint.leftJoin(right_flint, tolerance=tolerance, key=by)  
[In] [2]:  print (joined_flint.count() < left_flint.count())
True

I consider this is a false result since left join does not drop any row in the left table.
Any explanation or suggestion?

Both `.option("timeColumn","ds")` and `.parquet(path, time_column='ds') ` don't work

I have tried below way


df = fc.read.option("timeColumn","ds").option('isSorted', False).dataframe(spark.read.parquet('/test/SALECOUNT_OUT'))

df = fc.read.option("timeColumn","ds").option('isSorted', False).parquet('/test/SALECOUNT_OUT')

df = fc.read.option('isSorted', False).parquet('/test/SALECOUNT_OUT', time_column='ds')

Always throw error

Py4JJavaError: An error occurred while calling o91.canonizeTime.
: java.lang.IllegalArgumentException: Field "time" does not exist.
Available fields: store_id, product_id, store_product_id, sale_count, ds
        at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:274)
        at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:274)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:59)
        at org.apache.spark.sql.types.StructType.apply(StructType.scala:273)
        at com.twosigma.flint.timeseries.TimeSeriesRDD$.canonizeTime(TimeSeriesRDD.scala:123)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-3-95bfec85ec04> in <module>
----> 1 df = fc.read.option("timeColumn", "ds").parquet('/test/SALECOUNT_OUT')

~/.conda/envs/py3/lib/python3.7/site-packages/ts/flint/readwriter.py in parquet(self, *paths)
    399         """
    400         df = self._sqlContext.read.parquet(*paths)
--> 401         return self.dataframe(df)
    402
    403     def _reconcile_reader_args(self, begin=None, end=None, timezone='UTC',

~/.conda/envs/py3/lib/python3.7/site-packages/ts/flint/readwriter.py in dataframe(self, df, begin, end, timezone, is_sorted, time_column, unit)
    362             time_column=time_column,
    363             is_sorted=is_sorted,
--> 364             unit=self._parameters.timeUnitString())
    365
    366     def parquet(self, *paths):

~/.conda/envs/py3/lib/python3.7/site-packages/ts/flint/dataframe.py in _from_df(df, time_column, is_sorted, unit)
    248                                    time_column=time_column,
    249                                    is_sorted=is_sorted,
--> 250                                    unit=unit)
    251
    252     @staticmethod

~/.conda/envs/py3/lib/python3.7/site-packages/ts/flint/dataframe.py in __init__(self, df, sql_ctx, time_column, is_sorted, unit, tsrdd_part_info)
    133         # throw exception
    134         if time_column in df.columns:
--> 135             self._jdf = self._jpkg.TimeSeriesRDD.canonizeTime(self._jdf, self._junit)
    136
    137         if tsrdd_part_info:

/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258
   1259         for temp_arg in temp_args:

/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     77                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
     78             if s.startswith('java.lang.IllegalArgumentException: '):
---> 79                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     80             raise
     81     return deco

IllegalArgumentException: 'Field "time" does not exist.\nAvailable fields: store_id, product_id, store_product_id, sale_count, ds'

ds is timestamp type .

"timeColumn" option not respected in a "read.dataframe" call

@icexelloss hi there!

I'm glad the issues are being (pro)actively monitored and attended to, I wasn't expecting that.

Here is one issue I'm facing, it's not a big one, but an inconvenient one:

print( sc.version )
print( tm )

n = df.filter( df['Container'] == 'dbc94d4e3af6' ).select( tm, 'MemPercentG', 'CpuPercentG' )
n.show( truncate = False )
n.printSchema()

from ts.flint import FlintContext, clocks
from ts.flint import utils
  
fc = FlintContext( sqlContext )

r = fc.read \
    .option('isSorted', False) \
    .option('timeUnit', 's') \
    .option('timeColumn', tm) \
    .dataframe( n )

The output is:

2.3.1
TimeStamp
+-------------------+---------------+------------+
|TimeStamp          |MemPercentG    |CpuPercentG |
+-------------------+---------------+------------+
|2018-08-01 05:55:35|0.0030517578125|0.002331024 |
|2018-08-01 05:58:05|0.0030517578125|0.0031538776|
|2018-08-01 05:59:05|0.0030517578125|0.0030176123|
+-------------------+---------------+------------+

root
 |-- TimeStamp: timestamp (nullable = true)
 |-- MemPercentG: double (nullable = true)
 |-- CpuPercentG: float (nullable = true)

IllegalArgumentException: 'Field "time" does not exist.\nAvailable fields: TimeStamp, MemPercentG, CpuPercentG'
---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<command-911439891027714> in <module>()
     14 fc = FlintContext( sqlContext )
     15 
---> 16 r = fc.read     .option('isSorted', False)     .option('timeUnit', 's')     .option('timeColumn', tm)     .dataframe( n )
     17 
     18 

/databricks/python/lib/python3.5/site-packages/ts/flint/readwriter.py in dataframe(self, df, begin, end, timezone, is_sorted, time_column, unit)
    362             time_column=time_column,
    363             is_sorted=is_sorted,
--> 364             unit=self._parameters.timeUnitString())
    365 
    366     def parquet(self, *paths):

/databricks/python/lib/python3.5/site-packages/ts/flint/dataframe.py in _from_df(df, time_column, is_sorted, unit)
    248                                    time_column=time_column,
    249                                    is_sorted=is_sorted,
--> 250                                    unit=unit)
    251 
    252     @staticmethod

/databricks/python/lib/python3.5/site-packages/ts/flint/dataframe.py in __init__(self, df, sql_ctx, time_column, is_sorted, unit, tsrdd_part_info)
    133         # throw exception
    134         if time_column in df.columns:
--> 135             self._jdf = self._jpkg.TimeSeriesRDD.canonizeTime(self._jdf, self._junit)
    136 
    137         if tsrdd_part_info:

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     77                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
     78             if s.startswith('java.lang.IllegalArgumentException: '):
---> 79                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     80             raise
     81     return deco

IllegalArgumentException: 'Field "time" does not exist.\nAvailable fields: TimeStamp, MemPercentG, CpuPercentG'

Is this reproducible for you?

Please, advise, if I'm not using/calling it correctly or if it's a bug.

The flint libraries (the Scala and the Python ones) I installed on DataBricks via its UI (from the respective online repos, which might be dated) - I can try and install the latest builds from the freshest source code, if you think that will help.

Thanks.

[Python] Better argument checking

Flint python API does't check argument types currently, this leads to bad user experience.

For instance, summarizer.linearRegression takes a arg "xcols" as a list of string, if user pass a string, it throws a confusing exception:

Py4JError: An error occurred while calling z:com.twosigma.huohua.timeseries.summarize.Summary.linearRegression. Trace:
py4j.Py4JException: Method linearRegression([class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.Boolean]) does not exist

Failure of test: "correctly convert SQL TimestampType with default format" in "CSVSpec.scala".

Running sbt assembly yields the following:

[info] - should correctly convert SQL TimestampType with default format *** FAILED *** (244 milliseconds)
[info]   1199232000000000000 did not equal 1199253600000000000 (CSVSpec.scala:100)

Now, 1199232000000000000, the result of first.getAs[Long]("time"), where first contains the string "2018-01-02 00:00:00" equals 2018-01-01 18:00:00, which is in turn equal to "2018-01-02 00:00:00 UTC-6."

1199253600000000000, on the other hand, equals "2018-01-02 00:00:00" and since its timezone was extracted from format.setTimeZone(TimeZone.getDefault) (my computer resides in Mexico City, UTC-6), I believe it is being interpreted as "2018-01-02 00:00:00 UTC-6".

tl;dr: I believe the test is reading "2018-01-02 00:00:00" from CSV and assuming UTC and then converting it to UTC-6, yet simply appending my default timezone to the hard-coded string "2018-01-02 00:00:00".

java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.internalCreateDataFrame(Lorg

Hi - I am getting the following error when trying to run the Python example:

Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "/Users/__/flint-master/python/ts/flint/dataframe.py", line 592, in summarize
    tsrdd = self.timeSeriesRDD.summarize(composed_summarizer._jsummarizer(self._sc), scala_key)
  File "/Users/__/flint-master/python/ts/flint/dataframe.py", line 133, in timeSeriesRDD
    self._jdf, self._is_sorted, self._junit, self._time_column)
  File "/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/__/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.6/site-packages/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o284.fromDF.
java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.internalCreateDataFrame(Lorg/apache/spark/rdd/RDD;Lorg/apache/spark/sql/types/StructType;)Lorg/apache/spark/sql/Dataset;
	at org.apache.spark.sql.DFConverter$.toDataFrame(DFConverter.scala:37)
	at com.twosigma.flint.timeseries.TimeSeriesStore$.apply(TimeSeriesStore.scala:72)
	at com.twosigma.flint.timeseries.TimeSeriesStore$.apply(TimeSeriesStore.scala:59)
	at com.twosigma.flint.timeseries.TimeSeriesRDD$.fromDFWithPartInfo(TimeSeriesRDD.scala:388)
	at com.twosigma.flint.timeseries.TimeSeriesRDD$.fromDF(TimeSeriesRDD.scala:271)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)

Is this due to Spark support being limited to 2.0?
Thank you!!

"Rankers" Import error

I can successfully open the PySpark shell with the command provided on your python/README.md file

pyspark --master=local --jars /path/to/assembly/flint-assembly-0.6.0-SNAPSHOT.jar --py-files /path/to/assembly/flint-assembly-0.2.0-SNAPSHOT.jar

However, when I try to import the ts.flint module with the command

import ts.flint

I always get the error

cannot import name 'rankers'

Am I missing something? The file python/ts/flint/dataframe.py contains the statement

from . import rankers

but I cannot find any such file in the repository.

Any help would be appreciated.

Unable to load Timeseries RDD from Parquet without sorting

Hi,

If I save a TimeseriesRDD to parquet as described in the documentation and then try to reread it without sorting then it will often fail with an error of the form:
`"Partitions are not sorted. The partition 0 has the first key 1541810691000000000 and the partition 1 has the first key 1541780937000000000."

Doing a bit more research- I can see you have PR#16 which I think describes the issue in more detail and says that you are working around it using custom patched version of spark. I can see that the PR is year or so old, so presume it doesn't solve the issue?

If this is the case is there any other workaround that can be used so that you can save a timeseries rdd to disk and subsequently reload it without paying the cost of a sort?

`

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.