用 Python 定义 Schema 并生成 Parquet 文件

  原来用 Java 和 Python 实现过 Avro 转换成 Parquet 格式,所以 Schema 都是在 Avro 中定义的。这里要尝试的是如何定义 Parquet 的 Schema, 然后据此填充数据并生成 Parquet 文件。

  本文将演示两个例子,一个是没有层级的两个字段,另一个是含于嵌套级别的字段,将要使用到的 Python 模块有 pandas 和 pyarrow

简单字段定义

定义 Schema 并生成 Parquet 文件

import pandas as pd import pyarrow as pa import pyarrow.parquet as pq  # 定义 Schema schema = pa.schema([     ('id', pa.int32()),     ('email', pa.string()) ])  # 准备数据 ids = pa.array([1, 2], type = pa.int32()) emails = pa.array(['first@example.com', 'second@example.com'], pa.string())  # 生成 Parquet 数据 batch = pa.RecordBatch.from_arrays(     [ids, emails],     schema = schema ) table = pa.Table.from_batches([batch])  # 写 Parquet 文件 plain.parquet pq.write_table(table, 'plain.parquet') import pandas as pd  import pyarrow as pa  import pyarrow . parquet as pq  # 定义 Schema  schema = pa . schema ( [       ( 'id' , pa . int32 ( ) ) ,       ( 'email' , pa . string ( ) )  ] )  # 准备数据  ids = pa . array ( [ 1 , 2 ] , type = pa . int32 ( ) )  emails = pa . array ( [ 'first@example.com' , 'second@example.com' ] , pa . string ( ) )  # 生成 Parquet 数据  batch = pa . RecordBatch . from_arrays (       [ ids , emails ] ,       schema = schema  )  table = pa . Table . from_batches ( [ batch ] ) 

pq . write_table ( table , ‘plain.parquet’ )

验证 Parquet 数据文件

我们可以用工具 parquet-tools 来查看 plain.parquet 文件的数据和 Schema

 $ parquet-tools schema plain.parquet  message schema {      optional int32 id;      optional binary email (STRING);  }  $ parquet-tools cat --json plain.parquet  {"id":1,"email":"first@example.com"}  {"id":2,"email":"second@example.com"}  

没问题,与我们期望的一致。也可以用 pyarrow 代码来获取其中的 Schema 和数据

schema = pq.read_schema('plain.parquet') print(schema)  df = pd.read_parquet('plain.parquet') print(df.to_json()) schema = pq . read_schema ( 'plain.parquet' )  print ( schema )  df = pd . read_parquet ( 'plain.parquet' )  print ( df . to_json ( ) ) 

输出为

id: int32   -- field metadata --   PARQUET:field_id: '1' email: string   -- field metadata --   PARQUET:field_id: '2' {"id":{"0":1,"1":2},"email":{"0":"first@example.com","1":"second@example.com"}} id : int32     -- field metadata --     PARQUET : field_id : '1'  email : string     -- field metadata --     PARQUET : field_id : '2'  { "id" : { "0" : 1 , "1" : 2 } , "email" : { "0" : "first@example.com" , "1" : "second@example.com" } } 

含嵌套字段定义

下面的 Schema 定义加入一个嵌套对象,在 address 下分 email_address 和 post_address,Schema 定义及生成 Parquet 文件的代码如下

import pandas as pd import pyarrow as pa import pyarrow.parquet as pq  # 内部字段 address_fields = [     ('email_address', pa.string()),     ('post_address', pa.string()), ]  # 定义 Parquet Schema,address 嵌套了 address_fields schema = pa.schema(j)  # 准备数据 ids = pa.array([1, 2], type = pa.int32()) addresses = pa.array(     [('first@example.com', 'city1'), ('second@example.com', 'city2')],     pa.struct(address_fields) )  # 生成 Parquet 数据 batch = pa.RecordBatch.from_arrays(     [ids, addresses],     schema = schema ) table = pa.Table.from_batches([batch])  # 写 Parquet 数据到文件 pq.write_table(table, 'nested.parquet') import pandas as pd  import pyarrow as pa  import pyarrow . parquet as pq  # 内部字段  address_fields = [       ( 'email_address' , pa . string ( ) ) ,       ( 'post_address' , pa . string ( ) ) ,  ]  # 定义 Parquet Schema,address 嵌套了 address_fields  schema = pa . schema ( j )  # 准备数据  ids = pa . array ( [ 1 , 2 ] , type = pa . int32 ( ) )  addresses = pa . array (       [ ( 'first@example.com' , 'city1' ) , ( 'second@example.com' , 'city2' ) ] ,       pa . struct ( address_fields )  )  # 生成 Parquet 数据  batch = pa . RecordBatch . from_arrays (       [ ids , addresses ] ,       schema = schema  )  table = pa . Table . from_batches ( [ batch ] )  # 写 Parquet 数据到文件  pq . write_table ( table , 'nested.parquet' ) 

验证 Parquet 数据文件

同样用 parquet-tools 来查看下 nested.parquet 文件

 $ parquet-tools schema nested.parquet  message schema {      optional int32 id;      optional group address {          optional binary email_address (STRING);          optional binary post_address (STRING);      }  }  $ parquet-tools cat --json nested.parquet  {"id":1,"address":{"email_address":"first@example.com","post_address":"city1"}}  {"id":2,"address":{"email_address":"second@example.com","post_address":"city2"}}  

parquet-tools 看到的 Schama 并没有 struct 的字样,但体现了它 address 与下级属性的嵌套关系。

用 pyarrow 代码来读取 nested.parquet 文件的 Schema 和数据是什么样子

schema = pq.read_schema("nested.parquet") print(schema)  df = pd.read_parquet('nested.parquet') print(df.to_json()) schema = pq . read_schema ( "nested.parquet" )  print ( schema )  df = pd . read_parquet ( 'nested.parquet' )  print ( df . to_json ( ) ) 
id: int32   -- field metadata --   PARQUET:field_id: '1' address: struct<email_address: string, post_address: string>   child 0, email_address: string     -- field metadata --     PARQUET:field_id: '3'   child 1, post_address: string     -- field metadata --     PARQUET:field_id: '4'   -- field metadata --   PARQUET:field_id: '2' {"id":{"0":1,"1":2},"address":{"0":{"email_address":"first@example.com","post_address":"city1"},"1":{"email_address":"second@example.com","post_address":"city2"}}}  id : int32     -- field metadata --     PARQUET : field_id : '1'  address : struct & lt ; email_address : string , post_address : string & gt ;     child 0 , email_address : string       -- field metadata --       PARQUET : field_id : '3'     child 1 , post_address : string       -- field metadata --       PARQUET : field_id : '4'     -- field metadata --     PARQUET : field_id : '2'  { "id" : { "0" : 1 , "1" : 2 } , "address" : { "0" : { "email_address" : "first@example.com" , "post_address" : "city1" } , "1" : { "email_address" : "second@example.com" , "post_address" : "city2" } } } 

数据当然是一样的,有略微不同的是显示的 Schema 中, address 标识为 struct<email_address: string, post_address: string> , 明确的表明它是一个 struct 类型,而不是只展示嵌套层次。

最后留下一个问题,前面我们定义 Parquet Schema 都是在 Python 代码中完成了,Parquet 是否也能像 Avro 一样用外部文件来定义 Schema, 然后编译给 Python 用?