databricks / koalas Goto Github PK
View Code? Open in Web Editor NEWKoalas: pandas API on Apache Spark
License: Apache License 2.0
Koalas: pandas API on Apache Spark
License: Apache License 2.0
I think that there are a bunch of design considerations that will come up.
For example - when doing pd.DataFrame.sample()
- pandas guarantees exact records for n
and an exact fraction when using frac
.
When implementing them, will Koala decide to:
Option 1 could be implemented as:
Option 2 could be implemented as:
frac
use spark's behavior of fraction
n
do a count and use spark's sample() with fraction=n/df.count()
See https://pypi.org/project/databricks-koalas/
It just pulls in our entire README in markdown format.
spark.from_pandas
in the following example should be koalas.from_pandas
and the import from databricks import koalas
is also missing
import pandas as pd
pdf = pd.DataFrame({'x':range(3), 'y':['a','b','b'], 'z':['a','b','b']})
df = spark.from_pandas(pdf)
# Rename the columns
df.columns = ['x', 'y', 'z1']
# Do some operations in place:
df['x2'] = df.x * df.x
Move read_csv
and read_parquet
to monkey patch to SparkSession
instead of pyspark
package.
Currently we alway use default_session
in namespace.py
, but the session might be different from the one some users expect.
And make pyspark.read_csv
redirect to default_session().read_csv()
for the most users.
It still says "pandorable_sparky":
pip install https://s3-us-west-2.amazonaws.com/databricks-tjhunter/pandorable_sparky/pandorable_sparky-0.0.5-py3-none-any.whl
We have enough content to make a release onto PyPI. The goal of this ticket is to:
docs/RELEASE.md
for example0.1.0
, because this is already pretty usable.Travis CI has been enabled for the repository and I have added the button to the readme.
This ticket is to add a (simple) test and making sure that the linter runs on each run. Here is an example of .travis.yml
file that show how this is being done:
https://github.com/databricks/spark-sklearn/blob/master/.travis.yml
I believe we should now treat them as aliases like in Pandas:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.isnull.html#pandas.Series.isnull
@ueshin has an implementation already here:
https://github.com/databricks/spark-pandas/pull/63/files#diff-29cc007174a75f4d0a66a7e9ef40cd74R288
These are the patterns that I have seen with the apply() function:
df = pd.DataFrame({'a': [1, 2, 3, 4, 5, 6, 7], 'b': [7, 6, 5, 4, 3, 2, 1]})
# Option 1: Use a series and apply on it to run a UDF (Technically a Series.apply)
df['c'] = df['a'].apply(lambda x: x*2)
# Option 2: Use the entire row from a dataframe
df['d'] = df.apply(lambda x: x.a*x.b, axis=1)
# Option 3: Use some columns in a row
df['e'] = df[['a', 'b']].apply(lambda x: x.a*x.b, axis=1)
# Option 4: Apply aggregates on every column to create a new dataframe
new_df = df.apply(lambda x: x.sum())
# Option 5: Apply aggregates but return multiple values - here it adjusts the shorted columns with NaNs
new_df = df.apply(lambda x: x[:x[0]])
This functionality for selecting columns works in both Spark and Pandas.
But is broken in Koala.
In [1]: from datetime import datetime
...:
...: import pandas as pd
...: import numpy as np
...:
...: import findspark; findspark.init(); import pyspark
...: from pyspark.sql import types as T, functions as F
...:
In [2]: spark = pyspark.sql.SparkSession.builder.getOrCreate()
In [3]: # Get data
...: data = {'a{}'.format(i): [1,2,3,4] for i in range(2)}
...: data.update({'b{}'.format(i): ['a', 'b ', ' c', ' d '] for i in range(2)})
...: data.update({'c{}'.format(i): [[1,2], None, [3,4], [4,5,6]] for i in range(2)})
...: df = pd.DataFrame(data)
...: sdf = spark.createDataFrame(df)
...:
In [4]: sdf[['a0', 'a1']].show()
+---+---+
| a0| a1|
+---+---+
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
+---+---+
In [5]: import databricks.koala
In [6]: sdf[['a0', 'a1']].show()
---------------------------------------------------------------------------
SparkPandasNotImplementedError Traceback (most recent call last)
<ipython-input-6-c2947ceaf814> in <module>()
----> 1 sdf[['a0', 'a1']].show()
~/Documents/oss/databricks/spark-pandas/databricks/koala/structures.py in __getitem__(self, key)
619
620 def __getitem__(self, key):
--> 621 return anchor_wrap(self, self._pd_getitem(key))
622
623 def __setitem__(self, key, value):
~/Documents/oss/databricks/spark-pandas/databricks/koala/structures.py in _pd_getitem(self, key)
605 raise NotImplementedError(key)
606 if isinstance(key, list):
--> 607 return self.loc[:, key]
608 if isinstance(key, DataFrame):
609 # TODO Should not implement alignment, too dangerous?
~/Documents/oss/databricks/spark-pandas/databricks/koala/selection.py in __getitem__(self, key)
132 df = df._spark_where(reduce(lambda x, y: x & y, cond))
133 else:
--> 134 raiseNotImplemented("Cannot use slice for MultiIndex with Spark.")
135 elif isinstance(rows_sel, string_types):
136 raiseNotImplemented("Cannot use a scalar value for row selection with Spark.")
~/Documents/oss/databricks/spark-pandas/databricks/koala/selection.py in raiseNotImplemented(description)
102 description=description,
103 pandas_function=".loc[..., ...]",
--> 104 spark_target_function="select, where")
105
106 rows_sel, cols_sel = _unfold(key, self.col)
SparkPandasNotImplementedError: Cannot use slice for MultiIndex with Spark. You are trying to use pandas function .loc[..., ...], use spark function select, where
In [7]: df[['a0', 'a1']]
Out[7]:
a0 a1
0 1 1
1 2 2
2 3 3
3 4 4
@GregOwen suggested using Koalas because it is more of a nod to "Pandas".
We should adapt the test suites of Dask to make sure that we are not breaking anything.
The license allows us to copy the code, if we put the proper attribution in the README
dask uses a .compute() method, which we should add as an alias to .toPandas() to simplify the porting of tests
A good test suite to begin with is probably
https://github.com/dask/dask/blob/master/dask/dataframe/tests/test_dataframe.py
It is very long, so we can start with the first few tests to see how much change we need to do. In general, given the size of the testing code base, we should aim at limiting the changes we bring to it.
We should have a short tutorial that quickly explains:
The aim is to produce a simple tutorial based on the sections of the 10 minutes to pandas tutorial that are already implemented, and mark the remainder as not implemented yet.
In particular, the tutorial should answer the following questions:
Work has been started in this pull request, and more contributions are welcome:
#34
TODO: move this to a google doc for proper design.
All these are mostly implemented in https://github.com/databricks/spark-pandas/blob/master/pandorable_sparky/typing.py#L86
A couple of changes can make the current Pandas UDFs more friendly for non-spark users:
support native python types Users should not have to know the spark datatype objects.
Use python3 typing for expressing types Type hints are required by spark and now they can be expressed naturally in python 3 too. You can quickly declare a python UDF with the following pythonic syntax:
@spark_function
def my_udf(x: Col[int]) -> Col[double]:
pass
The input type is optional but then can be use to do either casting or type checking, just like in scala.
An alternative syntax for expressing the same thing in python 2 could be:
@spark_udf(col_x=int, col_return=double)
def my_udf(x): pass
Similarly for reducers. The following function can be automatically infered to be a UDAF because it returns an integer.
@spark_function
def my_reducer(x: Col[int]) -> int:
pass
Broadcasting For example, for UDFs that take multiple columns as arguments, a scalar should be accepted as treated as a SQL lit()
.
Useful and relevant code:
https://github.com/databricks/spark-pandas/blob/master/pandorable_sparky/typing.py#L86
https://github.com/databricks/Koala/blob/master/potassium/utils/spark_utils.py
https://github.com/dvgodoy/handyspark/blob/master/handyspark/sql/schema.py
After spending some time thinking about the issue, I think it would be better for the project to create its own entry point and its own DataFrame / Series class, rather than doing it through monkey patching or inherence to change the existing Spark DataFrame API.
The main reason I am leaning this way now is because Spark and Pandas DataFrames already have quite a few functions with the same name but different semantics, e.g. sample
, head
. Keeping existing Spark behavior does not accomplish the goal of the project, while changing existing Spark behavior based on some import statement is an anti-pattern of Python code. Doing it through encapsulation also avoids the issue of Koalas polluting the internal state of Spark's DataFrame through monkey patching.
So here's an alternative design:
koalas: parallel to pandas namespace. Includes I/O functions (read_csv
, read_parquet
) as well as general functions (e.g. concat
, get_dummies
). Functions here would only work if there is an active SparkSession.
koalas.DataFrame: A completely new interface based on Pandas' DataFrame API. KoalaDataFrame wraps around a Spark DataFrame along with additional internal states.
koalas.Series: Based on Pandas' DataFrame API.
The only thing I'd monkey patch into Spark's DataFrame is a toKoalas method, which turns an existing Spark DataFrame into a Koalas DataFrame.
While this change will bring more code, we will get the following benefits:
The main tradeoff is:
If we go with encapsulation, there is a separate class hierarchy in addition to the existing Spark ones, and the new hierarchy follows strictly Pandas semantics. The downside is that users cannot directly use a Spark DataFrame with Pandas code, and they need to call "spark_ds.toKoalas".
If we go with the current monkey patching approach, it's the opposite of the above. Users can directly use a Spark DataFrame with Pandas code, but existing Spark code's behavior might change as soon as they import koalas package. For example, "head" now returns 5 rows, rather than 1 row. "columns' return a Pandas Index, rather than a Python list. "sample" in the future might also change.
In [5]: # Get data
...: data = {'a{}'.format(i): [1,2,3,4] for i in range(2)}
...: data.update({'b{}'.format(i): ['a', 'b ', ' c', ' d '] for i in range(2)})
...: df = pd.DataFrame(data)
...: sdf = spark.createDataFrame(df)
...:
In [6]: import databricks.koala
In [7]: df = df.set_index('a0')
...: sdf = sdf.set_index('a0')
...:
In [8]: sdf['a0']
a0
1 1
2 2
3 3
4 4
Name: a0, dtype: int64
In [9]: df['a0']
Getting the index as a column seems to work in spark, but fails in pandas.
I guess I'm again coming from the question - What should be expected behavior in koala ?
Is an index considered a column ? (I know internally it is a column ... but i meant from a usability perspective.) Pandas does not think so
I realize this is a Pandas issue, but it will impact folks who want to switch between pandas and koalas/Spark.
If you write a spark file out (e.g. CSV or Parquet), you cannot read it in from Pandas (you can read it in the individual files though).
A lot of folks have complained about this in various places:
dask/fastparquet#159
uber/petastorm#321
Would be awesome if we had a workaround.
I think it would be awesome if some sort of readthedocs like documentation could be started.
I have begun exploring some stuff and rather than storing it all in my head could write it out so that it becomes a reference for other+me.
In Pandas, there is no good way to reverse get_dummies. This has been a long time issue (since 2014) with Pandas.
Sklearn and SparkML both have ways to reverse OHE. Would be amazing if Koalas supported this!
I notice that patch_spark()
documentation says it cannot be reverted.
That can be a bit scary especially cause I have a bunch of codes where I'd love to use koala for parts which I am writing. But i do need to also call third party python+spark libraries I cannot control and they may break
Ideally - I'd love to be able to do something like:
# Some sparky stuff - Create DF1, DF2
with init_koala():
# Koala stuff with DF1
# More sparky stuff with DF2
Implementation Ideas:
I think a very simple implementation of this could be a dict()
or all the items it is patching with the old versions of those and revert it using that dict. and reverting back. Just need a bit of book-keeping.
Let's have a small script that builds the project and pushes a new release onto databricks using the Databricks CLI. Pushing a new release onto DBFS is enough for users to attach to a cluster.
We should add https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.map.html#pandas.Series.map
With just handling dictionary of simple types for now. It should be a simple wrapper around the pandas implementation for the time being. This is useful for simple value substitution.
The only option that is relevant is columns
Pandas: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_parquet.html#pandas.read_parquet
Dask test suite:
https://github.com/dask/dask/blob/master/dask/dataframe/io/tests/test_parquet.py
I think we need this behavior especially with a property like values. We will load all the data in values
when we create a new DataFrame and this becomes heavy when the data is huge.
I found a good trick here and I think it can be useful. We just need to override __get__()
.
Any idea related to the topic and we can use it in Koalas?
For instance, we can solve values
by replacing it by to_numpy()
.
Pandas documentation suggests.
Warning: We recommend using DataFrame.to_numpy() instead.
Potentially a few things to add to the what is supported excel (giving from my list of common operations in pandas)
https://docs.google.com/spreadsheets/d/1GwBvGsqZAFFAD5PPc_lffDEXith353E1Y7UV6ZAAHWA/edit
pd.options
:Options should be considered at some point.
For example, I use with pd.option_context('mode.use_inf_as_null', True):
with data = data[[colname]].dropna()
to drop both INFs and NAs from a dataframe.
These should be documented as not implemented right now as they do change results
And I had a question as I see set_index
and reset_index
in the excel right now.
Is the plan that this library will implement the indexing logic on top of Spark ? (As Spark DataFrames dont have indexing)
i.e. will the index be tracked in koala and it will be a normal column in sparkDF but all index functionality will work ?
Since the documentation step in an external spreadsheet is proving to be too tedious to be respected, let's simply add most of the missing methods as stubs. These methods should:
NotImplementedError
(with a message for getting contributions and that includes the name of the missing function)It should be pretty easy to implement a method that always throws for these functions.
We should publish documentation to explain the changes that this package brings:
This probably depends on #90
Raising this to discuss what the handling of the .dtypes
should be.
Current implementations:
Other Differences:
object
copy and paste the content in
google doc
in # What is available
with one in https://hackmd.io/auth/github
and update the radme
Right now they look like:
def add(self, other, axis='columns', level=None, fill_value=None):
"""A stub for the equivalent method to `pd.DataFrame.add()`.
The method `pd.DataFrame.add()` is not implemented yet.
"""
raise PandasNotImplementedError(class_name='pd.DataFrame', method_name='add')
Should we simplify it to something like
def add(**kwargs):
unsupported_function()
or
add = unsupported_function()
or even
unsupported_function('add')
Reason I think we should simplify:
Scenario:
# Create Small Sample Dataset
dates = pd.date_range('20130101', periods=6)
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
kdf = spark.from_pandas(pdf)
When running this via Pandas, this works as expected:
pdf['D'][0]
Out[107]: 0.3420981969020352
But with Koalas, get the following error:
kdf['D'][0]
Out[106]: <repr(<pyspark.sql.column.Column at 0x7fcdeb84e710>) failed: pyspark.sql.utils.AnalysisException: "Can't extract value from D#6026: need struct type but got double;">
This function is going to require more thoughts than most others because Spark and Pandas have the same function name (sample
) to provide slightly different semantics:
Documentation:
Before starting on designing what the expectations should be, here are some constraints:
Some questions which the design doc should explore:
First release (private alpha)
more complete readme:
clean the print statements
make a script to pull the list of implemented functions in markdown
rename the package
I can't subtract two datetimes from each other. I can do this in pandas without issues. See this notebook: https://databricks-prod-cloudfront.cloud.databricks.com/public/793177bc53e528530b06c78a4fa0e086/0/12638605/100107/latest.html
Tony would like to show off this feature in his blog.
We need the package to be called databricks.koala
. It needs to be packaged correctly for that so it does not interfere with future databricks.other_packages
.
How to do this: https://packaging.python.org/guides/packaging-namespace-packages/
Based on discussions in #85, we are going to rename the project to Koalas.
They should either be removed or transformed as logging
I wrote out a single CSV file using Spark, and am trying to just read in that file directly (not the metadata) using Pandas.
However, I'm getting this "TypeError: field Transport DtTm: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>".
Here's Pandas: https://codecov.io/gh/pandas-dev/pandas
It'd also be great to add a badge similar to how Pandas does it.
Some environments like Databricks ship with outdated versions of python and pyarrow, which causes some deserialization errors. Let's put the exact supported bounds in the requirements. Here is what works from experimentation:
Dask test suite:
https://github.com/dask/dask/blob/master/dask/dataframe/tests/test_indexing.py
Dask implementation:
https://github.com/dask/dask/blob/master/dask/dataframe/indexing.py#L69
HandySpark:
https://github.com/databricks/Koala/blob/master/potassium/selection.py#L38
The dask implementation is very comprehensive, I believe that we do not need to implement every single way unless we can easily copy/paste dask.
The basic use cases are described here:
https://pandas.pydata.org/pandas-docs/stable/10min.html#getting
Let's not focus on the cases that require an index for now, this requires more thought about how we want to handle indexing.
We should create a CONTRIBUTING.md file that documents how to get started with the project, including build instructions.
The idea is that developers that are familiar with Python and Apache Spark should be able go from not having anything setup to be able to build and modify and run tests by just reading the CONTRIBUTING.md file. We should update "How to contribute" section in README.md to simply link to CONTRIBUTING.md.
Implement a pyspark.read_csv
method with the following options:
index_col
-> depends on the design of indexing, so let's leave it out for nowcolumns
mangle_dupe_cols
parse_dates
comment
usecols
Documentation:
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
Dask implementation:
https://github.com/dask/dask/blob/master/dask/dataframe/io/csv.py
Dask test suite:
https://github.com/dask/dask/blob/master/dask/dataframe/io/tests/test_csv.py
As we start to accept contributions, it's important to set the principles in place, so the project will carry forward in a coherent way.
rxin @ C02XT0W6JGH5 : ~/workspace/spark-pandas (master)
> dev/_make_missing_functions.py
Traceback (most recent call last):
File "dev/_make_missing_functions.py", line 22, in <module>
from databricks.koala.frame import PandasLikeDataFrame
ImportError: No module named databricks.koala.frame
Do I need to install koala first? We should add documentation to CONTRIBUTING.md. It'd also be best if this runs against the existing code base, rather than a system-wide installed Koala.
We should eventually support basic indexing, which is a very powerful concept in Pandas, is lightweight enough for Spark, and is ubiquitous in user code.
This requires proper design. Here are some thoughts:
https://docs.google.com/document/d/1PZIojXrafsTA8GOsCaoYMJ44qY4JZR2rNS3HaDF07ao/edit#
based on a conversation with Brooke, we should have a function called pyspark.DataFrame
that takes the same arguments as pandas.DataFrame
https://github.com/pandas-dev/pandas/blob/master/pandas/core/frame.py#L378 but returns a Spark dataframe. It should be a simple wrapper that calls pyspark.createDataFrame
We should carefully document the functions that are different between Pandas and Spark/Koala, including with a special decorator. Suggestions are welcome here. Here are a couple:
For these functions, the documentation should clearly state:
In addition, we should mark these functions with a python decorator to be able so that we can find easily the conflicts
Regarding providing an alternative, I would be in favour of providing a function like pandas_count
or pd_count
so that users have an easy way to switch.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.