mjakubowski84 / parquet4s Goto Github PK
View Code? Open in Web Editor NEWRead and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
Home Page: https://mjakubowski84.github.io/parquet4s/
License: MIT License
Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
Home Page: https://mjakubowski84.github.io/parquet4s/
License: MIT License
I have a use case for filtering a Parquet file using a list of IDs. Building it up using the operators provided seems like it would be unwieldy, and possibly also quite slow. The Parquet library supports user-defined predicates, but, as far as I can tell, they are not exposed by the Filter
mechanism.
Is there a way to supply a user-defined predicate, or would it be feasible to add that functionality?
it should not read Map[String, Any] but ParquetRecord - code will be much cleaner
Currently, I am working on a use-case that involves reading and writing parquet files to/from Google Cloud Storage from my Akka-stream application.
GCS uses Cloud Storage Connector (https://github.com/GoogleCloudPlatform/bigdata-interop/tree/master/gcs) that contains the implementation for the GS File Scheme. HadoopParquetWriter
needs setting certain Hadoop fs confs to interact with GCS (https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md)
I noticed, however, that the current implementation of the ParquetWriter
does not pass these confs.
As a quick solution, I forked the ParquetWriter
class and modified the customInternalWriter
method to accept Hadoop conf as an argument. This worked beautifully.
@mjakubowski84 Wanted to know if you think this is a reasonable approach? Do you see any potential problem with this approach?
Happy to submit the PR if you think this is a good solution.
Thanks
Hi,
does parquet4s support partitioning?
E.g. by date: some_key/year=2019/month=9/day=24/
I think this could be quite tedious to implement, if not supported by the lib. If possible at all?
Context: I'm trying to replace a Spark job that reads from Kafka and writes to S3 with an Akka Streams application.
Thanks!
... as it might be useful to have a separate pool for writing files
Currently the Hadoop parquet reader in the ParquetIterableImpl
is built without using the user provided configuration, therefore always building with the default configuration. I would like to update this to add the config before building.
Thanks,
David
Currently in most cases incorrect type of input for a record field ends with scala.MatchError. We do not know what was the input and which field of record is wring. Let's fix it.
I have a long-running application based on parquet4s that receives a continuous stream of records from Kafka and writes them out in large batches as parquet files.
It does not use Akka streams, but just relies on the core ParquetWriter in parquet4s.
Since the parquetwriter requires me to pass a preassembled collection containing the entire batch and I want my batches to be quite large, heap usage is problematic and flushing out a file places tremendous pressure on the garbage collector.
I have tried passing the parquetwriter a lazy stream that doesn't read the records from kafka until the parquet writer iterates over it, but this did not seem to help.
Is my use case supported? If so, what should I do?
Hello @mjakubowski84
First let me say that this is a wonderful project. I've been hoping for something like this for quite a while now, so many thanks for this! I am raising this issue as most often we are working with the Java 8 date/time API. It is often times difficult to work with the java.sql versions, and much conversion is done on our side to accommodate using them in our case classes which will be used with parquet4s. My proposal would be to support both the java.sql date/time classes as well as the java.time classes.
Also, we have even run into issues when rendering certain timestamps in an Impala table that we will see a UTC midnight instead representing the previous day, but with 24:00 as the time (I'm guessing this has to do with the day and actual epoch being encoded separately in parquet - but I'm not too familiar with exactly how timestamps are encoded in parquet). So essentially, they represent the same instant but are rendered differently. However, I think this is more due to #21 than the data type.
I have made the following commit on my fork that adds support for the java.time API as well as makes the timezone on TimeValueCodecs configurable (that was really the best way I could think of to make it configurable. Was thinking of a system property, but that makes testing a little off, and locks you in for the lifetime of your application). I have also updated test cases to account for these changes. The commit can be found here
Please let me know if this commit looks suitable to you and I will raise a pull request. Also, if there are any changes that you would like me to make to this, I am more than happy to implement them.
We need good tests for handling nulls and options in fields and collections
Probably it tests need to be moved to separate module
At the moment the example is focusing on writing data on Parquet format,
could you add an example about reading data from Parquet format? It currently needs an implicit Decoder to be manually added, is there an easy way to do so?
According to implementation in Spark
Should I expect to be able to use a ParquetReader
to read a subset of the columns in a parquet file?
For example:
case class Orig(a: String, b: Int)
val originals: Seq[Orig] = ???
ParquetWriter[Orig].write("my_file.parquet", originals)
case class Projected(b: Int)
val justBs = ParquetReader.read[Projected]("my_file.parquet")
for { (original, justb) <- originals.zip(justBs) } {
assert(original.b == justb.b)
}
I think I've seen similar functionality advertised for hadoop's AvroParquetReader
, but it does not seem to work with Parquet4s. (And maybe it is not intended to work, but it never hurts to ask ...)
introduce project subpackage
Hello I'm reading a raw Array[Byte] and wanna reconstruct my Parquet object from it. Can I do that with current implementation ? If not, do you consider that to be a valuable contribution?
Thanks,
Boris
Hi Marcin,
I am trying to setup a IndefiniteStreamParquetSink
with a programmatically generated Hadoop Configuration which contains a key,secret and endpoint as stated in the documentation https://github.com/mjakubowski84/parquet4s#passing-hadoop-configs-programmatically.
The IndefiniteStreamParquetSink
validates the write path before it continues to create the flow
But it looks like the hadoop configuration is not being used while validating the write path here :
It looks like it should use the hadoop Configuration from the given write options writeOptions.hadoopConf
.
Let me know what you think before I create a Pull request :)
Regards,
Tim
I already have a MessageType
that I'd like to use to write parquet files, most of the actual message type schema setting is private, is there a way to do this I'm missing?
Update documents AFTER release:
My case class is big (~10 fields, each is an Option[X] for a different X, where X is another case class containing 10 fields with type Option[String] or Option[Seq[String]] or Option[Y] where Y is another case class.... you get the idea)
The problem is that, when trying to resolve the schema, the scala compiler thinks the implicit expansion is divergent and aborts.
Do I have any options here besides describing the entire schema manually?
While trying to execute basic example code ,I Unable to find ParquetStreams class in latest jar.
If anybody would really need this functionality then this issue can be addressed.
At this moment the library allows only to define immutable collections in case class schema. Support for collections like list, seq, array, etc. can be added by developer on their own by defining own CollectionTransformer
(https://github.com/mjakubowski84/parquet4s/blob/master/core/src/main/scala/com/github/mjakubowski84/parquet4s/CollectionTransformer.scala).
Regarding Map there would be need to define own ValueCodec
.
Library can provide such functionality out of the box.
... to make releases faster
The goal is to allow developers to define which time zone is used by codecs when reading or writing time-based data to Parquet files.
Use local machine's settings by default.
To include:
... as in ParquetWriter
Default value in case of missing data for required field (schema case class requires it but parquet does not have it) is a bad idea. We can get rid of cats that way as well, one dep less.
Hi!
Would you be open to a pull request which cuts down the dependencies for your core module from hadoop-client
to hadoop-common
?
Reasons someone using your project might want this:
I have a proof of concept at rtoomey-coatue@5d385d6 with your tests and integration tests passing.
Either way, your library is awesome and this is something Scala really needs! Also, I enjoyed reading your tests ๐ธ
I have a use-case where I need to periodically append to a parquet file and then eventually close it. Right now the write method assumes you have all of your data available (or at least a reference to a single iterable). This seems like it should be fairly simple to implement by exposing the functionality of the internalWriter
, but it doesn't seem to align well with the current ParquetWriter
type class, which only has one method write(path, data, options)
. IMO it would be reasonable to make ParquetWriter
be a class like:
class IncrementalParquetWriter[T: ParquetRecordEncoder : ParquetSchemaResolver](path: String, options: ParquetWriter.Options = ParquetWriter.Options()) {
private val writer = internalWriter(new Path(path), ParquetSchemaResolver.resolveSchema[T], options)
private val valueCodecConfiguration = options.toValueCodecConfiguration
def write(data: Iterable[T]): Unit = data.foreach { elem =>
writer.write(ParquetRecordEncoder.encode[T](elem, valueCodecConfiguration))
}
def close(): Unit = writer.close()
}
But that breaks the interface currently defined by the type class and AFAIK is actually impossible to implement as a type class since it takes constructor parameters. Do you have any suggestions for how this should be implemented? Should it maybe just totally side-step the type class?
The only thing I've gotten working without totally wrecking the type class methods is the following, which feels very clunky:
/**
* Type class that allows to write data which schema is represented by type <i>T</i> to given path.
* @tparam T schema of data to write
*/
trait ParquetWriter[T] {
/**
* Writes data to given path.
* @param path location where files are meant to be written
* @param data data to write
* @param options configuration of how Parquet files should be created and written
*/
def write(path: String, data: Iterable[T], options: ParquetWriter.Options)
/**
* Instantiate a new [[IncrementalParquetWriter]]
* @param path The path to which this writer will write
* @param options Options for writing
*/
def incrementalWriter(path: String, options: ParquetWriter.Options): IncrementalParquetWriter[T]
}
and use IncrementalParquetReader as interface in akka module
consider merge to master, release branch or smth else
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.