Code Monkey home page Code Monkey logo

Comments (33)

xitongsys avatar xitongsys commented on September 27, 2024 2

@IkiM0no
These two issues are bugs and I have fixed. You can try to use the latest version.
Thanks :)

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024 1

@IkiM0no
Thanks your suggestions and I have added a plugin named CSVWriter which can meet you requirement.
Descriptions has added in readme file and the example in example/csv_write.go. You can download the latest version.

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024 1

hi, @IkiM0no Sorry about the delay. These days I rewrite many parts of parquet-go, which gives a great performance improvement. Some of the functions has changed. So please use the latest version.

Parquet writer has a buffer inside, so you can't create a new writer to append some other data to the same parquet file. I have changed many places and fixed some bugs in your code. Now it works.
(Besides I have something confused, such as why you write all data twice in the same file?)
Hope it's helpful :)

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024 1

@IkiM0no I have update parquet-go again and the reader/writer has been implemented inside and users need not to implement by themselves. Please use the latest version . I have changed and tested your code, it is ok.

package main

import (
	"encoding/csv"
	"fmt"
	"github.com/satori/go.uuid"
	. "github.com/xitongsys/parquet-go/ParquetFile"
	. "github.com/xitongsys/parquet-go/Plugin/CSVWriter"
	"log"
	"os"
	"strings"
	"sync"
)

var (
	workers       = make(map[string]chan []string)
	parquetWriter = make(map[string]*CSVWriter)
	csvWriter     = make(map[string]*csv.Writer)
	wg            = sync.WaitGroup{}
	mu            = sync.Mutex{}
	localPath     = "./"
	path          = localPath
	md            = []MetadataType{
		{Type: "UTF8", Name: "Name"},
		{Type: "INT32", Name: "Age"},
		{Type: "INT64", Name: "Id"},
		{Type: "FLOAT", Name: "Weight"},
		//{Type: "BOOLEAN", Name: "Sex"},
	}
)

func writerT(csvRaw string, tableMetaData []MetadataType) {
	defer wg.Wait()
	tableMetaData = tableMetaData
	s := strings.NewReader(csvRaw)
	r := csv.NewReader(s)

	lines, err := r.ReadAll()
	if err != nil {
		log.Fatalf("error reading all lines: %v", err)
	}
	numLines := len(lines) - 1

	for idx, rec := range lines {
		if idx == numLines {
			stopWorkers()
			return
		}
		process(rec)
	}
}

func process(rec []string) {
	l := len(rec)
	part := rec[l-1]

	if c, ok := workers[part]; ok {
		c <- rec
	} else {
		nc := make(chan []string)
		workers[part] = nc

		u := fmt.Sprintf("%v", uuid.NewV4())
		csvName := localPath + u + ".csv"
		csvFile, _ := os.OpenFile(csvName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
		csvWriter[part] = csv.NewWriter(csvFile)

		fName := localPath + u + ".parq"
		pf, _ := NewLocalFileWriter(fName)
		parquetWriter[part], _ = NewCSVWriter(md, pf, 10)

		go workerP(nc)
		nc <- rec
	}
}

func workerP(c chan []string) {
	wg.Add(1)
	defer wg.Done()

	var pw *CSVWriter
	var writer *csv.Writer

	part := [][]string{}
	for {
		// wait for a rec or close(chan)
		rec, ok := <-c
		if ok {
			key := rec[len(rec)-1]
			pw = parquetWriter[key]
			writer = csvWriter[key]

			mu.Lock()
			// PARQUET
			row := make([]*string, len(rec))
			for j := 0; j < len(rec); j++ {
				row[j] = &rec[j]
			}
			// drop partition col
			row = row[:len(row)-1]
			pw.WriteString(row)

			// CSV
			if err := writer.Write(rec); err != nil {
				log.Fatal(err)
			}
			mu.Unlock()
			part = append(part, rec)

		} else {
			if len(part) <= 0 {
				break
			}

			key := part[0][len(part[0])-1]
			pw = parquetWriter[key]
			writer = csvWriter[key]

			mu.Lock()
			// PARQUET
			for _, p := range part {
				row := make([]*string, len(p))
				for j := 0; j < len(p); j++ {
					row[j] = &p[j]
				}
				pw.WriteString(row[:len(row)-1])
			}

			// CSV
			for _, p := range part {
				if err := writer.Write(p); err != nil {
					log.Fatal(err)
				}
				writer.Flush()
			}
			mu.Unlock()

			break
		}
	}
	pw.Flush(true)
	pw.WriteStop()
}

// simply signals to workers to stop
func stopWorkers() {
	for _, c := range workers {
		// signal to all workers to exit
		close(c)
	}
}

func main() {
	num := 10
	res := ""
	for i := 0; i < num; i++ {
		res += fmt.Sprintf("%s_%d", "Student Name", i) + "," +
			fmt.Sprintf("%d", 20+i%5) + "," +
			fmt.Sprintf("%d", i) + "," +
			fmt.Sprintf("%f", 50.0+float32(i)*0.1) + "," +
			fmt.Sprintf("%t", i%2 == 0) + "\n"
	}
	log.Println(res)
	writerT(res, md)
}

from parquet-go.

nutzhub avatar nutzhub commented on September 27, 2024

I'm also testing it out as well, it would be a grate idea to provide godoc and ci badges

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

hi @kelindar @nut-abctech , sorry about the delay reply and thanks for your suggestions.
I'm trying to add more ut and regression test, godoc and ci. Help is needed and everything is welcome :)
On the safe side, I suggest to use it in production when the ut and regression test is finished.
Test and feedback is welcome, thanks!

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

Hi @xitongsys , we are looking to convert .csv files on S3 to parquet. Would this tool be suitable?
From what I can see in /example/benchmark/WriteParquet.go, it looks like at line 80 you are generating some demo data in a loop.
Theoretically, could one do something similar, but instead loop over some .csv file, parse each row to an object like Student, and write to .parquet as you have? Thanks for your consideration.

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

@IkiM0no Yes, it works. But you should provide the similar interface of S3 as my example.

In addition I'm considering to do some other work:
(1) add more examples of converting csv/orc... to parquet.
(2) considering the flat data is the most case, I will add some specific functions to convert to parquet, which will streamline the process and will have much better performance
So this project is still under active development and will have much more functions. You can follow the latest updating :)
Hope it's helpful

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

Very helpful @xitongsys thank you very much.
I will attempt the .csv conversion to parquet using your library. If successful, I'll post a gist that perhaps you could incorporate into your examples.
Regarding "you should provide the similar interface of S3 as my example", could you point me to where this is? I can't seem to find it by searching "S3" in this repo. Thanks again!

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

@IkiM0no oh...sorry about my misleading expression
I have provided the local/hdfs examples, which have the interface implement. If you want to write/read file on S3, you should provide the similar interface.

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

Thanks for the clarification @xitongsys :)
Sorry, but one more question. I was able to create the parquet file from csv, but when I upload to s3 and create table from the parquet file, all the data is 'broken', meaning columns contain byte characters.

I suspect maybe this is due to the parameters I am passing to ph.WriteInit(), particularly np and objAveSize. Should these values passed correspond to the dimensions of the table? What to pass here for these values, for example my data is 3 columns and 3 rows?

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

@IkiM0no
These two parameters just influence the performance.
objAveSize is the approximated size of every row data.

what is the meaning of "broken" ? Do you store some strings, but get byte characters? like
"ABC" -> [65, 67, 68]
If it is this situation, you can use UTF8 type instead of BYTE_ARRAY.
Because BYTE_ARRAY is a base type in parquet, it can store many types of variables.
Besides I don't know how do you create table from S3 (maybe hive?), you can upload the results, which can give more hints.

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

@xitongsys, thanks, let me provide some clarification and my steps.

My csv sitting on s3 is very simple, 3 rows, 3 columns like this:

rob,pike,50
jane,doe,33
foo,bar,22

My test script is just grabbing this file from S3, no problems there.
The rest of the script is taken from your example: /example/benchmark/WriteParquet.go
For each row in the csv i create a 'Person' struct (with the appropriate UT8 and INT64 datatypes), just like your 'Student' struct and write it to the parquet file.
I put this file back on s3 manually and in Impala create a table:

CREATE EXTERNAL TABLE temp.parqtest (`First` STRING, `Last` STRING, `Age` INT) 
STORED AS PARQUET 
LOCATION  's3a://path/to/parquet/directory/'
;

When I select * from that table temp.parqtest, the data is all jumbled together. It doesn't create the appropriate rows and columns I expect. I'll post a screen shot tomorrow.
Now I am attaching the parquet file and the go script I used.
Thank you very much for your assistance!

tmp.zip

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

Hi, @IkiM0no
In your pq.go Line 62
'ph.WriteInit(f, new(Person), 3, 30)' is error
you should use
'ph.WriteInit(f, new(PP), 3, 30)'

You can try it and if there are still error, please tell me :)

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

Ah! Please forgive my user error. I have corrected this issue. I have also changed the variable names so they aren't reserved words 'First' and 'Last' in SQL, So 'First' > 'First_name', etc.

Now I can inspect the schema and file using parquet-tools.

$ parquet-tools schema test.parquet
message parquet_go_root {
  required int64 Person_age;
  required binary Last_name (UTF8);
  required binary First_name (UTF8);
}
$ parquet-tools head -n 5 test.parquet
Person_age = 50
Last_name = pike
First_name = rob
... etc

Great!

Interestingly though, the columns appear in reverse order. I have seen that this can be an issue for Impala.

Anyway, I dropped and re-created my table using either column order and refresh:

CREATE EXTERNAL TABLE staging.ptest 
(Person_age INT, Last_name STRING, First_name STRING)
--( First_name STRING, Last_name STRING, Person_age INT)
STORED AS PARQUET
LOCATION  's3a://<my_bucket>/path/to/test_parq/'
;
REFRESH staging.ptest;

Now when I query the table, Impala complains:

File 's3a://<my_bucket>/path/to/test_parq/test.parquet' uses an unsupported encoding: DELTA_BINARY_PACKED for column 'First'.

Can I specify some other encoding?

EDIT: I found this page stating that Imapala supports certain encodings: https://www.cloudera.com/documentation/enterprise/5-3-x/topics/impala_parquet.html

parquet-tools meta shows:

$ parquet-tools meta test.parquet
creator:     null

file schema: parquet_go_root
---------------------------------------------------------------------------------------------------------------------------------------
Person_age:  REQUIRED INT64 R:0 D:0
Last_name:   REQUIRED BINARY O:UTF8 R:0 D:0
First_name:  REQUIRED BINARY O:UTF8 R:0 D:0

row group 1: RC:3 TS:438
---------------------------------------------------------------------------------------------------------------------------------------
First_name:   BINARY SNAPPY DO:0 FPO:0 SZ:150/144/0.96 VC:3 ENC:PLAIN,DELTA_BINARY_PACKED,RLE,BIT_PACKED
Person_age:   INT64 SNAPPY DO:0 FPO:150 SZ:147/141/0.96 VC:3 ENC:PLAIN,DELTA_BINARY_PACKED,RLE,BIT_PACKED
Last_name:    BINARY SNAPPY DO:0 FPO:297 SZ:141/135/0.96 VC:3 ENC:PLAIN,DELTA_BINARY_PACKED,RLE,BIT_PACKED

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

@xitongsys
Happy to report that since your latest commit, my example is working! :)
Thank you for your support.
I will try on a production-scale test next to see what we can produce.

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

@IkiM0no
Happy to hear that and expect your further feedback :) thanks

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

@xitongsys something I've been working on is abstracting the need to write/hard-code a struct for each csv file I am converting to parquet (with the help of parquet-go, of course (: ).

I'm expressing this as schemata.go in my project:

package main
import (
	"github.com/xitongsys/parquet-go/parquet"
)
type Record struct {
	Cols []Column
}
type Column struct {
	Value        string
	PqType     parquet.ConvertedType
}

and generating a list of type Record from my csv data at runtime.

The issue is, that type Record is not the kind of struct expected by parquet-go/ParquetHandler/Write.go

How would you recommend approaching this?

I feel like maybe Write.go should expose a method that implements Write functionality for structs like Record so that users of your library can avoid hard-coding data structures.

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

@xitongsys apologies for the delay, but I have now had time to test the CSVWriter and happy to report this works brilliantly! Many thanks. I've been telling people about this library and how great it is. Will continue to test and provide feedback as we scale up our efforts :)

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

@IkiM0no
thanks your test and feedback.
Parquet-go is still under active development and it will have some changes. If your codes doesn't work using the latest codes, you can read the readme file :)
Today I add some features :

  1. you should firstly call Flush() before WriteStop().
  2. CSVWriter also changed. It support two types of input: string and values.

Maybe you should update your code.

The readme and examples has been updated, you can read it.

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

Hi @xitongsys. I'm now using parquet-go to iterate over a data set and want to write to multiple parquet files based on a partition column in the data. So all rows with partition col = "01" write to 01.parq, if partition col = "02" write to 02.parq, etc.

I am spinning off a 'worker' go routine for each row and passing the row over a channel to a worker() function.

Now I would like to create a different version of the Create() method from your example that creates the parquet file if it does not exist (that partition has not yet been seen), or simply appends to that .parq file if it has already been created. I have done this successfully with plain .csv files, but am having trouble with parquet-go because ParquetFile is an an interface and I'm having trouble extending it.

This is my basic method:

func (self *PqFile) CreateOrAppend(name string) (ParquetFile, error) {
	//file, err := os.Create(name)
	file, err := os.OpenFile(name, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	myFile := new(PqFile)
	myFile.File = file
	return myFile, err
}

Do you have advice on how I can add this new method to the *PqFile object?

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

hi, @IkiM0no , Do you mean you must use 'CreateOrAppend' as your function name? If it's not, why don't you put your code in Create() method?

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

@xitongsys good suggestion, I have implemented Create() as suggested:

func (self *PqFile) Create(name string) (ParquetFile, error) {
	//file, err := os.Create(name)
	file, err := os.OpenFile(name, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	myFile := new(PqFile)
	myFile.File = file
	return myFile, err
}

Below is writer.go in my project.

As a test, I added a csv writer to workerP() to test if I had some bug in my concurrency.

The csv files write as expected, and the .parq files are created, but only contain a string 'PAR1'. I know I have a bug somewhere. I have added lots of print statements, verified that the channels are sending and receiving data, but cannot seem to locate it.

Do you see that I have implemented something incorrectly in my NewCSVWriterHandler or maybe the Create function?

package main

import (
	"os"
	"fmt"
	"log"
	"sync"
	"strings"
	"crypto/rand"
	"encoding/csv"
	. "github.com/xitongsys/parquet-go/ParquetHandler"
	. "github.com/xitongsys/parquet-go/Plugin/CSVWriter"
)

var (
	// list of channels to communicate with workers
	// workers accessed synchronousely no mutex required
	workers = make(map[string]chan []string)

	// wg is to make sure all workers done before exiting main
	wg = sync.WaitGroup{}

	// mu used only for sequential printing, not relevant for program logic
	mu = sync.Mutex{}
	path = localPath
)

func writerT(csvRaw string, tableMetaData []MetadataType) {

	// wait for all workers to finish up before exit
	defer wg.Wait()

	// Set local instance of tableMetaData from what was passed in
	tableMetaData = tableMetaData
	s := strings.NewReader(csvRaw)
	r := csv.NewReader(s)

	lines, err := r.ReadAll()
	if err != nil {
		log.Fatalf("error reading all lines: %v", err)
	}
	numLines := len(lines) -1

	for idx, rec := range lines {
		if idx == numLines {
			stopWorkers()
			return
		}
		process(rec)
	}
}

func process(rec []string) {
	l := len(rec)
	part := rec[l-1]

	if c, ok := workers[part]; ok {
		// send rec to worker
		c <- rec
	} else {
		// if no worker for the partition

		// make a chan
		nc := make(chan []string)
		workers[part] = nc

		// start worker with this chan
		go workerP(nc)

		// send rec to worker via chan
		nc <- rec
	}
}

func workerP(c chan []string) {

	// wg.Done signals to main worker completion
	wg.Add(1)
	defer wg.Done()

	u, _ := newUUID()

	// CSV
	csvName := localPath + u + ".csv"
	// If the file doesn't exist, create it, or append to the file
	csvFile, err := os.OpenFile(csvName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		log.Fatal(err)
	}
	writer := csv.NewWriter(csvFile)

	fName := localPath + u + ".parq"
	fmt.Println(fName)

	// PARQUET
	var f ParquetFile
	f = &PqFile{}
	f, err = f.Create(fName)
	if err != nil {
		fmt.Println(err)
	}
	ph := NewCSVWriterHandler()
	ph.WriteInit(tableMetaData, f, 10, 30)

	part := [][]string{}
	for {
		// wait for a rec or close(chan)
		rec, ok := <-c
		if ok {
			// save the rec
			// instead of accumulation in memory
			// this can be saved to file directly

			mu.Lock()
			row := make([]*string, len(rec))
			for j := 0; j < len(rec); j++ {
				row[j] = &rec[j]
			}
			ph.Write(row)

			// CSV
			if err := writer.Write(rec); err != nil {
				log.Fatal(err)
			}
			mu.Unlock()

			part = append(part, rec)
		} else {
			// channel closed on EOF
			// dump partition
			// locks ensures sequential printing
			// not a required for independent files

			mu.Lock()
			row := make([]*string, len(rec))
			for j := 0; j < len(rec); j++ {
				row[j] = &rec[j]
			}
			ph.Write(row)

			// CSV
			for _, p := range part {
				if err := writer.Write(p); err != nil {
					log.Fatal(err)
				}
				writer.Flush()
			}
			mu.Unlock()

			return
		}
	}
	ph.Flush()
	ph.WriteStop()
	f.Close()
	csvFile.Close()
}

// simply signals to workers to stop
func stopWorkers() {
	for _, c := range workers {
		// signal to all workers to exit
		close(c)
	}
}

// newUUID generates a random UUID according to RFC 4122
func newUUID() (string, error) {
	uuid := make([]byte, 16)
	n, err := io.ReadFull(rand.Reader, uuid)
	if n != len(uuid) || err != nil {
		return "", err
	}
	// variant bits; see section 4.1.1
	uuid[8] = uuid[8]&^0xc0 | 0x80
	// version 4 (pseudo-random); see section 4.1.3
	uuid[6] = uuid[6]&^0xf0 | 0x40
	return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil
}

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

hi @IkiM0no
In your code, the following part may be wrong:

row := make([]*string, len(rec))   
for j := 0; j < len(rec); j++ {   
       row[j] = &rec[j]   
}   
ph.Write(row)  

You should use ph.WriteString(rec).
Write function is used to write value directly. WriteString function is used to write strings and it convert the strings to corresponding type in metadata.
For details, you can read the example in readme file. Hope it's helpful :)

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

Thank you for the suggestion @xitongsys :) I see what happened, I did not go get the latest version of Plugin/CSVWriter explicitly, so I had an older version without the WriteString() method.

I have implemented the change you suggest, but still see only "PAR1" in the parquet files, but complete data in the csv files (being written by the same workerP().

I wonder what the issue is because those are the only 4 bytes contained in each parquet file. I have added a small change to drop the last item in the row slice, as that is the partition column, not contained in tableMetaData, but apart from that it looks just like the README you provided. Anyway, I confirmed the worker can still 'see' the tableMetaData by printing it just before the call to ph.WriteInit(). The Create method is the same as I provided before.

This is my updated writer.go with your suggestion to use WriteString(), but still not working as expected.

I wonder, do you think that I need to go get all the packages in parquet-go again? Like maybe I missed some critical push you have done in the past month? Or do you see some issue with my code?

package main

import (
	"os"
	//"io"
	"fmt"
	"log"
	"sync"
	//"reflect"
	"strings"
	"encoding/csv"
	. "github.com/xitongsys/parquet-go/ParquetHandler"
	. "github.com/xitongsys/parquet-go/Plugin/CSVWriter"
)

var (
	// list of channels to communicate with workers
	// workers accessed synchronousely no mutex required
	workers = make(map[string]chan []string)

	// wg is to make sure all workers done before exiting main
	wg = sync.WaitGroup{}

	// mu used only for sequential printing, not relevant for program logic
	mu = sync.Mutex{}
	path = localPath

	// Moved to global scope
	//tableMetaData []MetadataType
)

func writerT(csvRaw string, tableMetaData []MetadataType) {

	// wait for all workers to finish up before exit
	defer wg.Wait()

	// Set local instance of tableMetaData from what was passed in
	tableMetaData = tableMetaData
	s := strings.NewReader(csvRaw)
	r := csv.NewReader(s)

	lines, err := r.ReadAll()
	if err != nil {
		log.Fatalf("error reading all lines: %v", err)
	}
	numLines := len(lines) -1

	for idx, rec := range lines {
		if idx == numLines {
			stopWorkers()
			return
		}
		process(rec)
	}
}

func process(rec []string) {
	l := len(rec)
	part := rec[l-1]

	if c, ok := workers[part]; ok {
		// send rec to worker
		c <- rec
	} else {
		// if no worker for the partition

		// make a chan
		nc := make(chan []string)
		workers[part] = nc

		// start worker with this chan
		go workerP(nc)

		// send rec to worker via chan
		nc <- rec
	}
}

func workerP(c chan []string) {

	// wg.Done signals to main worker completion
	wg.Add(1)
	defer wg.Done()

	u, _ := newUUID()

	// CSV
	csvName := localPath + u + ".csv"
	// If the file doesn't exist, create it, or append to the file
	csvFile, err := os.OpenFile(csvName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		log.Fatal(err)
	}
	writer := csv.NewWriter(csvFile)

	fName := localPath + u + ".parq"
	fmt.Println(fName)

	// PARQUET
	var pf ParquetFile
	//var err error
	pf = &MyFile{}
	pf, err = pf.Create(fName)
	if err != nil {
		fmt.Println(err)
	}

    ph := NewCSVWriterHandler()
	fmt.Println("TMD:", tableMetaData)
    ph.WriteInit(tableMetaData, pf, 10, 30)

	part := [][]string{}
	for {
		// wait for a rec or close(chan)
		rec, ok := <-c
		if ok {
			// save the rec
			// instead of accumulation in memory
			// this can be saved to file directly

			mu.Lock()
			// PARQUET
			row := make([]*string, len(rec))
			for j := 0; j < len(rec); j++ {
				row[j] = &rec[j]
			}

			// drop partition col
			row = row[:len(row) - 1]
			ph.WriteString(row)

			// cannot use rec (type []string) as type []*string in argument to ph.WriteString
			//rec = rec[:len(rec) - 1]
			//ph.WriteString(rec)

			// CSV
			if err := writer.Write(rec); err != nil {
				log.Fatal(err)
			}
			mu.Unlock()

			part = append(part, rec)

		} else {
			// channel closed on EOF
			// dump partition
			// locks ensures sequential printing
			// not a required for independent files

			mu.Lock()
			// PARQUET
			for _, p := range part {
				row := make([]*string, len(p))
				for j := 0; j < len(p); j++ {
					row[j] = &p[j]
					// drop partition col
					noPart := row[:len(row) - 1]
					ph.WriteString(noPart)
				}
			}

			// CSV
			for _, p := range part {
				if err := writer.Write(p); err != nil {
					log.Fatal(err)
				}
				writer.Flush()
			}
			mu.Unlock()

			return
		}
	}
	ph.Flush()
	ph.WriteStop()
	pf.Close()
	csvFile.Close()
}

// simply signals to workers to stop
func stopWorkers() {
	for _, c := range workers {
		// signal to all workers to exit
		close(c)
	}
}

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

Thank you @xitongsys very much for your continued support and development on parquet-go! :)
I will begin testing on the new version tomorrow and will report back progress.

from parquet-go.

IkiM0no avatar IkiM0no commented on September 27, 2024

@xitongsys happy to report this latest v0.9.8 is working very, very well! 👍 The new Writer capabilities are making this very fast now that we can use concurrency.

One thing I'm wondering still is about how to properly pass dates to parquet-go.
In my data, I have time stamp columns in this format: "2017-11-02 18:32:45.94". I have made sure that in table meta data these are assigned correctly to "DATE" type, but when I inspect the data with parquet-tools (cli utility) I see the year only, for example:

$ parquet-tools head -n 10 myfile.parq
ingest_datetime = 2017
event_datetime = 2017
... etc, more columns

This is the output for inspecting the same columns schema:

$ parquet-tools schema myfile.parq
 optional int32 ingest_datetime (DATE);
 optional int32 event_datetime (DATE);
... etc, more columns

Should I be formatting the time stamp(s) prior to writing to the file, perhaps it prefers Unix epoch time format?

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

@IkiM0no "DATE is used to for a logical date type, without a time of day. It must annotate an int32 that stores the number of days from the Unix epoch, 1 January 1970."
Example is in example/local_flat.go. I test the parquet file in spark/hive , and the output is

+-----------+---+---+------+-----+----------+
|       Name|Age| Id|Weight|  Sex|       Day|
+-----------+---+---+------+-----+----------+
|StudentName| 20|  0|  50.0| true|2017-11-28|
|StudentName| 21|  1|  50.1|false|2017-11-28|
|StudentName| 22|  2|  50.2| true|2017-11-28|
|StudentName| 23|  3|  50.3|false|2017-11-28|
|StudentName| 24|  4|  50.4| true|2017-11-28|
|StudentName| 20|  5|  50.5|false|2017-11-28|
|StudentName| 21|  6|  50.6| true|2017-11-28|
|StudentName| 22|  7|  50.7|false|2017-11-28|
|StudentName| 23|  8|  50.8| true|2017-11-28|
|StudentName| 24|  9|  50.9|false|2017-11-28|
+-----------+---+---+------+-----+----------+

If you want to store a time stamp, i think you should use TIMESTAMP_MILLIS
''TIMESTAMP_MILLIS is used for a combined logical date and time type, with millisecond precision. It must annotate an int64 that stores the number of milliseconds from the Unix epoch, 00:00:00.000 on 1 January 1970, UTC."
For more information, you can see the parquet-format project: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md

from parquet-go.

tejasmanohar avatar tejasmanohar commented on September 27, 2024

I was wondering if there's an update here as per whether this library should be considered stable for production usage.

from parquet-go.

xitongsys avatar xitongsys commented on September 27, 2024

As I known, several people have used this library in their production environments. So I think you can also have a try :)

from parquet-go.

aohua avatar aohua commented on September 27, 2024

@tejasmanohar I'm already using it in a production application. This was the only go library that I found can read and write parquet files. It saves a lot of work for me and it works very well so far. However, in my case the performance is not critical. Why not give it a try and maybe you can help to improve it as well.

from parquet-go.

tejasmanohar avatar tejasmanohar commented on September 27, 2024

@xitongsys @aohua Sweet. I just wanted to know if folks are depending on it.

Why not give it a try

Because I can also just use the JVM like we have to for Spark :trollface:

from parquet-go.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.