Golang处理parquet文件实战指南

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

Golang处理parquet文件实战指南

梦想画家   2023-03-23 我要评论

前言

Parquet是Apache基金会支持的项目,是面向列存储二进制文件格式。支持不同类型的压缩方式,广泛用于数据科学和大数据环境,如Hadoop生态。

本文主要介绍Go如何生成和处理parquet文件。

创建结构体

首先创建struct,用于表示要处理的数据:

type user struct {
  ID        string    `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt time.Time //wont be saved in the parquet file
}

这里要提醒的是tag,用于说明struct中每个字段在生成parquet过程中如何被处理。

parquet-go包可以处理parquet数据,更多的tag可以参考其官网。

生成parquet文件

下面现给出生成parquet文件的代码,然后分别进行说明:

package main

import (
  "fmt"
  "log"
  "time"
  "github.com/bxcodec/faker/v3"
  "github.com/xitongsys/parquet-go-source/local"
  "github.com/xitongsys/parquet-go/parquet"
  "github.com/xitongsys/parquet-go/reader"
  "github.com/xitongsys/parquet-go/writer"
)

type user struct {
  ID        string    `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt time.Time //wont be saved in the parquet file
}

const recordNumber = 10000

func main() {
  var data []*user
  //create fake data
  for i := 0; i < recordNumber; i++ {
    u := &user{
      ID:        faker.UUIDDigit(),
      FirstName: faker.FirstName(),
      LastName:  faker.LastName(),
      Email:     faker.Email(),
      Phone:     faker.Phonenumber(),
      Blog:      faker.URL(),
      Username:  faker.Username(),
      Score:     float64(i),
      CreatedAt: time.Now(),
    }
    data = append(data, u)
  }
  err := generateParquet(data)
  if err != nil {
    log.Fatal(err)
  }

}

func generateParquet(data []*user) error {
  log.Println("generating parquet file")
  fw, err := local.NewLocalFileWriter("output.parquet")
  if err != nil {
    return err
  }
  //parameters: writer, type of struct, size
  pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data)))
  if err != nil {
    return err
  }
  //compression type
  pw.CompressionType = parquet.CompressionCodec_GZIP
  defer fw.Close()
  for _, d := range data {
    if err = pw.Write(d); err != nil {
      return err
    }
  }
  if err = pw.WriteStop(); err != nil {
    return err
  }
  return nil
}

定义结构体上面已经说明,但需要提醒的是类型与文档保持一致:

Primitive TypeGo Type
BOOLEANbool
INT32int32
INT64int64
INT96(deprecated)string
FLOATfloat32
DOUBLEfloat64
BYTE_ARRAYstring
FIXED_LEN_BYTE_ARRAYstring

接着就是使用faker包生成模拟数据。然后调用err := generateParquet(data)方法。该方法大概逻辑为:

  • 首先准备输出文件,然后基于本地输出文件构造pw,用于写parquet数据:
  fw, err := local.NewLocalFileWriter("output.parquet")
  if err != nil {
    return err
  }
  //parameters: writer, type of struct, size
  pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data)))
  if err != nil {
    return err
  }

  //compression type
  pw.CompressionType = parquet.CompressionCodec_GZIP
  defer fw.Close()

然后设置压缩类型,并通过defer操作确保关闭文件。下面开始写数据:

  for _, d := range data {
    if err = pw.Write(d); err != nil {
      return err
    }
  }
  if err = pw.WriteStop(); err != nil {
    return err
  }
  return nil

循环写数据,最后调用pw.WriteStop()停止写。 成功写文件后,下面介绍如何读取parquet文件。

读取parquet文件

首先介绍如何一次性读取文件,主要用于读取较小的文件:

func readParquet() ([]*user, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return nil, err
  }

  pr, err := reader.NewParquetReader(fr, new(user), recordNumber)
  if err != nil {
    return nil, err
  }

  u := make([]*user, recordNumber)
  if err = pr.Read(&u); err != nil {
    return nil, err
  }
  pr.ReadStop()
  fr.Close()
  return u, nil
}

大概流程如下:首先定义本地文件,然后构造pr用于读取parquet文件:

  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return nil, err
  }

  pr, err := reader.NewParquetReader(fr, new(user), recordNumber)
  if err != nil {
    return nil, err
  }

然后定义目标内容容器u,一次性读取数据:

  u := make([]*user, recordNumber)
  if err = pr.Read(&u); err != nil {
    return nil, err
  }
  pr.ReadStop()
  fr.Close()

但一次性大量记录加载至内存可能有问题。这是官方文档提示:

If the parquet file is very big (even the size of parquet file is small, the uncompressed size may be very large), please don’t read all rows at one time, which may induce the OOM. You can read a small portion of the data at a time like a stream-oriented file.

大意是不要一次读取文件至内存,可能造成OOM。实际应用中应该分页读取,下面通过代码进行说明:

func readPartialParquet(pageSize, page int) ([]*user, error) {
	fr, err := local.NewLocalFileReader("output.parquet")
	if err != nil {
		return nil, err
	}
	defer func() {
		_ = fr.Close()
	}()

	pr, err := reader.NewParquetReader(fr, new(user), int64(pageSize))
	if err != nil {
		return nil, err
	}
	defer pr.ReadStop()

	//num := pr.GetNumRows()
	
	pr.SkipRows(int64(pageSize * page))
	u := make([]*user, pageSize)
	if err = pr.Read(&u); err != nil {
		return nil, err
	}

	return u, nil
}

与上面函数差异不大,首先函数包括两个参数,用于指定页大小和页数,关键代码是跳过一定记录:

  pr.SkipRows(int64(pageSize * page))

根据这个方法可以获得总行数,pr.GetNumRows(),然后结合页大小计算总页数,最后循环可以实现分页查询。

计算列平均值

既然使用了Parquet列存储格式,下面演示下如何计算Score列的平均值。

func calcScoreAVG() (float64, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return 0.0, err
  }
  pr, err := reader.NewParquetColumnReader(fr, recordNumber)
  if err != nil {
    return 0.0, err
  }
  num := int(pr.GetNumRows())

  data, _, _, err := pr.ReadColumnByPath("parquet_go_root\u0001score", num)
  if err != nil {
    return 0.0, err
  }
  var result float64
  for _, i := range data {
    result += i.(float64)
  }
  return (result / float64(num)), nil
}

首先打开文件,然后调用pr.GetNumRows()方法获取总行数。然后基于路径指定列,其中parquet_go_root为根路径,因为前面使用字节数组,这里分割符变为\u0001,完整路径为:parquet_go_root\u0001score

总结

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们