Skip to content

Commit

Permalink
Query filter (#56)
Browse files Browse the repository at this point in the history
* implement ParquetQueryFilter types

* implement parquetQuery filter tests

* implement parquetQuery filter

* filter before ordering

* apply filters before sorting/slicing

* format types

* add deep equality utility

* document and format equals utility

* use deep equality checks

* update filter tests

* support more types for equality

* make $not unary

* ensure arrays are correctly compared

* support both forms of $not

* add operator tests

* Filter operator tests

---------

Co-authored-by: Brian Park <[email protected]>
Co-authored-by: Kenny Daniel <[email protected]>
  • Loading branch information
3 people authored Dec 21, 2024
1 parent cb639a0 commit c9727a4
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 5 deletions.
73 changes: 68 additions & 5 deletions src/query.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
import { parquetReadObjects } from './hyparquet.js'
import { parquetMetadataAsync } from './metadata.js'
import { equals } from './utils.js'

/**
* Wraps parquetRead with orderBy support.
* Wraps parquetRead with filter and orderBy support.
* This is a parquet-aware query engine that can read a subset of rows and columns.
* Accepts an optional orderBy column name to sort the results.
* Accepts optional filter object to filter the results and orderBy column name to sort the results.
* Note that using orderBy may SIGNIFICANTLY increase the query time.
*
* @param {ParquetReadOptions & { orderBy?: string }} options
* @import {ParquetQueryFilter} from '../src/types.d.ts'
* @param {ParquetReadOptions & { filter?: ParquetQueryFilter, orderBy?: string }} options
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
*/
export async function parquetQuery(options) {
const { file, rowStart, rowEnd, orderBy } = options
const { file, rowStart, rowEnd, orderBy, filter } = options
options.metadata ||= await parquetMetadataAsync(file)

// TODO: Faster path for: no orderBy, no rowStart/rowEnd, one row group

if (typeof orderBy === 'string') {
if (filter) {
// TODO: Move filter to parquetRead for performance
const results = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined })
return results
.filter(row => matchQuery(row, filter))
.sort((a, b) => orderBy ? compare(a[orderBy], b[orderBy]) : 0)
.slice(rowStart, rowEnd)
} else if (typeof orderBy === 'string') {
// Fetch orderBy column first
const orderColumn = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] })

Expand Down Expand Up @@ -98,3 +107,57 @@ function compare(a, b) {
if (a > b) return 1
return 1 // TODO: how to handle nulls?
}

/**
* Match a record against a query filter
*
* @param {any} record
* @param {ParquetQueryFilter} query
* @returns {boolean}
* @example matchQuery({ id: 1 }, { id: {$gte: 1} }) // true
*/
export function matchQuery(record, query = {}) {

if (query.$not) {
return !matchQuery(record, query.$not)
}

if (query.$and) {
return query.$and.every(subQuery => matchQuery(record, subQuery))
}

if (query.$or) {
return query.$or.some(subQuery => matchQuery(record, subQuery))
}

return Object.entries(query).every(([field, condition]) => {
const value = record[field]

if (condition !== null && (Array.isArray(condition) || typeof condition !== 'object')) {
return equals(value, condition)
}

return Object.entries(condition || {}).every(([operator, target]) => {
switch (operator) {
case '$gt':
return value > target
case '$gte':
return value >= target
case '$lt':
return value < target
case '$lte':
return value <= target
case '$ne':
return !equals(value, target)
case '$in':
return Array.isArray(target) && target.includes(value)
case '$nin':
return Array.isArray(target) && !target.includes(value)
case '$not':
return !matchQuery({ [field]: value }, { [field]: target })
default:
return true
}
})
})
}
19 changes: 19 additions & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,22 @@ export interface ParquetReadOptions {
compressors?: Compressors // custom decompressors
utf8?: boolean // decode byte arrays as utf8 strings (default true)
}

export type ParquetQueryValue = string | number | boolean | object | null | undefined

export type ParquetQueryOperator = {
$gt?: ParquetQueryValue
$gte?: ParquetQueryValue
$lt?: ParquetQueryValue
$lte?: ParquetQueryValue
$ne?: ParquetQueryValue
$in?: ParquetQueryValue[]
$nin?: ParquetQueryValue[]
}

export interface ParquetQueryFilter {
[key: string]: ParquetQueryValue | ParquetQueryOperator | ParquetQueryFilter[] | undefined
$and?: ParquetQueryFilter[]
$or?: ParquetQueryFilter[]
$not?: ParquetQueryFilter
}
16 changes: 16 additions & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,22 @@ export function concat(aaa, bbb) {
}
}

/**
* Deep equality comparison
*
* @param {any} a First object to compare
* @param {any} b Second object to compare
* @returns {boolean} true if objects are equal
*/
export function equals(a, b) {
if (a === b) return true
if (a instanceof Uint8Array && b instanceof Uint8Array) return equals(Array.from(a), Array.from(b))
if (!a || !b || typeof a !== typeof b) return false
return Array.isArray(a) && Array.isArray(b)
? a.length === b.length && a.every((v, i) => equals(v, b[i]))
: typeof a === 'object' && Object.keys(a).length === Object.keys(b).length && Object.keys(a).every(k => equals(a[k], b[k]))
}

/**
* Get the byte length of a URL using a HEAD request.
* If requestInit is provided, it will be passed to fetch.
Expand Down
137 changes: 137 additions & 0 deletions test/query.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,141 @@ describe('parquetQuery', () => {
const futureRows = parquetQuery({ file, orderBy: 'nonexistent' })
await expect(futureRows).rejects.toThrow('parquet columns not found: nonexistent')
})

it('reads data with filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: 2 } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] },
{ a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] },
])
})

it('reads data with filter and rowStart/rowEnd', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: 2 }, rowStart: 1, rowEnd: 5 })
expect(toJson(rows)).toEqual([ { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] } ])
})

it('reads data with filter and orderBy', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: 2 }, orderBy: 'b' })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] },
{ a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] },
])
})

it('reads data with filter, orderBy, and rowStart/rowEnd', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: 2 }, orderBy: 'b', rowStart: 1, rowEnd: 2 })
expect(toJson(rows)).toEqual([ { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] } ])
})

it('reads data with $and filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { $and: [{ c: 2 }, { e: [1, 2, 3] }] } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
])
})

it('reads data with $or filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { $or: [{ c: 2 }, { d: false }] } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
])
})

it('reads data with $not filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { $not: { c: 2 } } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 2, c: 3, d: true },
{ a: 'abc', b: 3, c: 4, d: true },
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
])
})

it('reads data with $not value filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: { $not: 2 } } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 2, c: 3, d: true },
{ a: 'abc', b: 3, c: 4, d: true },
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
])
})

it('reads data with $gt filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { b: { $gt: 3 } } })
expect(toJson(rows)).toEqual([
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
])
})


it('reads data with $gte filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { b: { $gte: 3 } } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 3, c: 4, d: true },
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
])
})

it('reads data with $lt filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { b: { $lt: 3 } } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
{ a: 'abc', b: 2, c: 3, d: true },
])
})

it('reads data with $lte filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { b: { $lte: 3 } } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
{ a: 'abc', b: 2, c: 3, d: true },
{ a: 'abc', b: 3, c: 4, d: true },
])
})

it('reads data with $ne filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { b: { $ne: 3 } } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
{ a: 'abc', b: 2, c: 3, d: true },
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
])
})

it('reads data with $in filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { b: { $in: [2, 4] } } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 2, c: 3, d: true },
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
])
})

it('reads data with $nin filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { b: { $nin: [2, 4] } } })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
{ a: 'abc', b: 3, c: 4, d: true },
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
])
})
})

0 comments on commit c9727a4

Please sign in to comment.