Skip to content

SQL Transforms

FastForward uses Apache DataFusion to run SQL queries on your log data. Every log line becomes a row in a virtual logs table.

The JSON scanner creates columns using bare field names with native Arrow types:

  • level — Utf8View (string)
  • status — Int64 (integer)
  • latency_ms — Float64 (float)

Use bare names directly in SQL:

SELECT * FROM logs WHERE level = 'ERROR'

When a field has mixed types across rows (e.g., status is sometimes an int, sometimes a string), the builder emits a StructArray conflict column (status: Struct { int: Int64, str: Utf8View }). Before SQL execution, conflict columns are normalized to flat Utf8 columns via COALESCE(CAST(int AS Utf8), CAST(float AS Utf8), str). Use int(status) or float(status) for numeric operations on these columns.

FastForward registers the following custom scalar functions in addition to DataFusion’s built-in functions.

json(column, key) — extract a string value from JSON

Section titled “json(column, key) — extract a string value from JSON”
json(column: Utf8, key: Utf8) → Utf8

Extracts a field from a raw JSON string column and returns its value as a string. Integer and float values are coerced to their string representation. Returns NULL when the key is not present.

SELECT json(body, 'host') AS host FROM logs

json_int(column, key) — extract an integer value from JSON

Section titled “json_int(column, key) — extract an integer value from JSON”
json_int(column: Utf8, key: Utf8) → Int64

Extracts a field from a raw JSON string column and returns its value as a 64-bit integer. Returns NULL when the key is not present or the value is not a JSON number (e.g. a quoted string "200" returns NULL).

SELECT json_int(body, 'status') AS status FROM logs
WHERE json_int(body, 'status') >= 500

json_float(column, key) — extract a float value from JSON

Section titled “json_float(column, key) — extract a float value from JSON”
json_float(column: Utf8, key: Utf8) → Float64

Extracts a field from a raw JSON string column and returns its value as a 64-bit float. Integer values are promoted to float. Returns NULL when the key is not present or the value is a quoted string.

SELECT json_float(body, 'duration') AS duration_sec FROM logs
WHERE json_float(body, 'duration') > 1.5

grok(column, pattern) — grok pattern extraction

Section titled “grok(column, pattern) — grok pattern extraction”
grok(column: Utf8, pattern: Utf8) → Struct

Applies a Logstash-style grok pattern to each row and returns a struct with one Utf8 field per named capture (%{PATTERN:name}). Non-matching rows produce NULL struct fields.

Built-in patterns: IP, IPV4, IPV6, NUMBER, INT, BASE10NUM, WORD, NOTSPACE, SPACE, DATA, GREEDYDATA, QUOTEDSTRING, UUID, MAC, URIPATH, URIPATHPARAM, URI, TIMESTAMP_ISO8601, DATE, TIME, LOGLEVEL, HOSTNAME, EMAILADDRESS.

SELECT grok(body, '%{WORD:method} %{URIPATH:path} %{NUMBER:status}')
FROM logs

regexp_extract(column, pattern, group) — regex capture group extraction

Section titled “regexp_extract(column, pattern, group) — regex capture group extraction”
regexp_extract(column: Utf8, pattern: Utf8, group: Int64) → Utf8

Returns the capture group at the given index (1-based), or the full match when group is 0. Returns NULL when the pattern does not match or the group index is out of range.

SELECT regexp_extract(body, 'status=(\d+)', 1) AS status_code FROM logs
geo_lookup(ip: Utf8) → Struct

Looks up an IP address in the configured MaxMind database and returns a struct with fields: country_code (Utf8), country_name (Utf8), city (Utf8), region (Utf8), latitude (Float64), longitude (Float64), asn (Int64), org (Utf8). All fields are nullable — private/unresolvable IPs return NULL. Requires a geo_database enrichment in the pipeline config.

Use get_field() to extract individual fields:

enrichment:
- type: geo_database
format: mmdb
path: /data/GeoLite2-City.mmdb
transform: |
SELECT *, get_field(geo, 'city') AS city, get_field(geo, 'country_code') AS country
FROM (SELECT *, geo_lookup(source_ip) AS geo FROM logs)
transform: "SELECT * FROM logs WHERE level IN ('ERROR', 'WARN')"
transform: "SELECT * EXCEPT (request_id, trace_id) FROM logs"
transform: "SELECT duration_ms AS latency_ms, * EXCEPT (duration_ms) FROM logs"
transform: |
SELECT *,
CASE WHEN duration_ms > 200 THEN 'slow' ELSE 'fast' END AS speed
FROM logs
transform: |
SELECT level, COUNT(*) as count, AVG(duration_ms) as avg_latency
FROM logs
GROUP BY level
TopicWhere to go
See all YAML optionsYAML Reference
Understand the scannerScanner Deep Dive (interactive)
Deploy to productionKubernetes DaemonSet
Debug transform issuesTroubleshooting