-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
124 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
import { assembleObjects, readDataPage, readDictionaryPage } from './datapage.js' | ||
import { parquetHeader } from './header.js' | ||
import { getMaxDefinitionLevel, isRequired } from './schema.js' | ||
import { snappyUncompress } from './snappy.js' | ||
import { CompressionCodec, Encoding, PageType } from './types.js' | ||
|
||
/** | ||
* @typedef {import('./types.js').SchemaElement} SchemaElement | ||
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData | ||
* @typedef {import('./types.js').RowGroup} RowGroup | ||
*/ | ||
|
||
/** | ||
* Read a column from the file. | ||
* | ||
* @param {ArrayBuffer} arrayBuffer parquet file contents | ||
* @param {RowGroup} rowGroup row group metadata | ||
* @param {ColumnMetaData} columnMetadata column metadata | ||
* @param {SchemaElement[]} schema schema for the file | ||
* @returns {ArrayLike<any>} array of values | ||
*/ | ||
export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) { | ||
// find start of column data | ||
const { codec, dictionary_page_offset, data_page_offset } = columnMetadata | ||
let columnOffset = dictionary_page_offset | ||
if (dictionary_page_offset === undefined || data_page_offset < dictionary_page_offset) { | ||
columnOffset = data_page_offset | ||
} | ||
columnOffset = Number(columnOffset) // cast bigint to number | ||
|
||
// parse column data | ||
let valuesSeen = 0 | ||
let byteOffset = 0 // byteOffset within the column | ||
let dictionary = undefined | ||
const rowIndex = [0] // map/list object index | ||
while (valuesSeen < rowGroup.num_rows) { | ||
// parse column header | ||
const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset) | ||
byteOffset += headerLength | ||
if (!header || header.compressed_page_size === undefined) throw new Error('header is undefined') | ||
|
||
// read compressed_page_size bytes starting at offset | ||
const compressedBytes = new Uint8Array( | ||
arrayBuffer, columnOffset + byteOffset, header.compressed_page_size | ||
) | ||
// decompress bytes | ||
let page | ||
const uncompressed_page_size = Number(header.uncompressed_page_size) // TODO: unsafe cast | ||
if (codec === CompressionCodec.GZIP) { | ||
throw new Error('GZIP compression not supported') | ||
} else if (codec === CompressionCodec.SNAPPY) { | ||
page = new Uint8Array(uncompressed_page_size) | ||
snappyUncompress(compressedBytes, page) | ||
} else if (codec === CompressionCodec.LZO) { | ||
throw new Error('LZO compression not supported') | ||
} | ||
if (!page || page.length !== uncompressed_page_size) { | ||
throw new Error('decompressed page size does not match header') | ||
} | ||
|
||
// parse page data by type | ||
if (header.type === PageType.DATA_PAGE) { | ||
const daph = header.data_page_header | ||
if (!daph) throw new Error('data page header is undefined') | ||
|
||
const { definitionLevels, repetitionLevels, value } = readDataPage(page, daph, schema, columnMetadata) | ||
valuesSeen += daph.num_values | ||
|
||
// construct output values: skip nulls and construct lists | ||
let values | ||
if (repetitionLevels.length) { | ||
// Use repetition levels to construct lists | ||
if ([Encoding.PLAIN_DICTIONARY, Encoding.RLE_DICTIONARY].includes(daph.encoding)) { | ||
// TODO: dereference dictionary values | ||
} | ||
const isNull = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]]) | ||
const nullValue = false // TODO: unused? | ||
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) | ||
values = assembleObjects(definitionLevels, repetitionLevels, value, isNull, nullValue, maxDefinitionLevel, rowIndex[0]) | ||
} else if (definitionLevels) { | ||
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) | ||
// Use definition levels to skip nulls | ||
let index = 0 | ||
values = [] | ||
const decoder = new TextDecoder() | ||
for (let i = 0; i < definitionLevels.length; i++) { | ||
if (definitionLevels[i] === maxDefinitionLevel) { | ||
if (index > value.length) throw new Error('parquet index out of bounds') | ||
let v = value[index++] | ||
// map to dictionary value | ||
if (dictionary) { | ||
v = dictionary[v] | ||
if (v instanceof Uint8Array) { | ||
try { | ||
v = decoder.decode(v) | ||
} catch (e) { | ||
console.warn('parquet failed to decode byte array as string', e) | ||
} | ||
} | ||
} | ||
values[i] = v | ||
} else { | ||
values[i] = undefined | ||
} | ||
} | ||
} else { | ||
// TODO: use dictionary | ||
values = value | ||
} | ||
|
||
// TODO: check that we are at the end of the page | ||
return values | ||
} else if (header.type === PageType.DICTIONARY_PAGE) { | ||
const diph = header.dictionary_page_header | ||
if (!diph) throw new Error('dictionary page header is undefined') | ||
|
||
dictionary = readDictionaryPage(page, diph, schema, columnMetadata) | ||
} else { | ||
throw new Error(`parquet unsupported page type: ${header.type}`) | ||
} | ||
byteOffset += header.compressed_page_size | ||
} | ||
throw new Error('parquet error reading column should have returned') | ||
} |