SQL Transforms
FastForward uses Apache DataFusion to run SQL queries on your log data. Every log
line becomes a row in a virtual logs table.
Column naming
Section titled “Column naming”The JSON scanner creates columns using bare field names with native Arrow types:
level— Utf8View (string)status— Int64 (integer)latency_ms— Float64 (float)
Use bare names directly in SQL:
SELECT * FROM logs WHERE level = 'ERROR'When a field has mixed types across rows (e.g., status is sometimes an int,
sometimes a string), the builder emits a StructArray conflict column
(status: Struct { int: Int64, str: Utf8View }).
Before SQL execution, conflict columns are normalized to flat Utf8 columns
via COALESCE(CAST(int AS Utf8), CAST(float AS Utf8), str).
Use int(status) or float(status) for numeric operations on these columns.
Custom UDFs
Section titled “Custom UDFs”FastForward registers the following custom scalar functions in addition to DataFusion’s built-in functions.
json(column, key) — extract a string value from JSON
Section titled “json(column, key) — extract a string value from JSON”json(column: Utf8, key: Utf8) → Utf8Extracts a field from a raw JSON string column and returns its value as a
string. Integer and float values are coerced to their string representation.
Returns NULL when the key is not present.
SELECT json(body, 'host') AS host FROM logsjson_int(column, key) — extract an integer value from JSON
Section titled “json_int(column, key) — extract an integer value from JSON”json_int(column: Utf8, key: Utf8) → Int64Extracts a field from a raw JSON string column and returns its value as a
64-bit integer. Returns NULL when the key is not present or the value is not
a JSON number (e.g. a quoted string "200" returns NULL).
SELECT json_int(body, 'status') AS status FROM logs WHERE json_int(body, 'status') >= 500json_float(column, key) — extract a float value from JSON
Section titled “json_float(column, key) — extract a float value from JSON”json_float(column: Utf8, key: Utf8) → Float64Extracts a field from a raw JSON string column and returns its value as a
64-bit float. Integer values are promoted to float. Returns NULL when the
key is not present or the value is a quoted string.
SELECT json_float(body, 'duration') AS duration_sec FROM logs WHERE json_float(body, 'duration') > 1.5grok(column, pattern) — grok pattern extraction
Section titled “grok(column, pattern) — grok pattern extraction”grok(column: Utf8, pattern: Utf8) → StructApplies a Logstash-style grok pattern to each row and returns a struct with
one Utf8 field per named capture (%{PATTERN:name}). Non-matching rows
produce NULL struct fields.
Built-in patterns: IP, IPV4, IPV6, NUMBER, INT, BASE10NUM,
WORD, NOTSPACE, SPACE, DATA, GREEDYDATA, QUOTEDSTRING, UUID,
MAC, URIPATH, URIPATHPARAM, URI, TIMESTAMP_ISO8601, DATE,
TIME, LOGLEVEL, HOSTNAME, EMAILADDRESS.
SELECT grok(body, '%{WORD:method} %{URIPATH:path} %{NUMBER:status}') FROM logsregexp_extract(column, pattern, group) — regex capture group extraction
Section titled “regexp_extract(column, pattern, group) — regex capture group extraction”regexp_extract(column: Utf8, pattern: Utf8, group: Int64) → Utf8Returns the capture group at the given index (1-based), or the full match
when group is 0. Returns NULL when the pattern does not match or the
group index is out of range.
SELECT regexp_extract(body, 'status=(\d+)', 1) AS status_code FROM logsgeo_lookup(ip) — GeoIP enrichment
Section titled “geo_lookup(ip) — GeoIP enrichment”geo_lookup(ip: Utf8) → StructLooks up an IP address in the configured MaxMind database and returns a struct
with fields: country_code (Utf8), country_name (Utf8), city (Utf8),
region (Utf8), latitude (Float64), longitude (Float64), asn (Int64),
org (Utf8). All fields are nullable — private/unresolvable IPs return
NULL. Requires a geo_database enrichment in the pipeline config.
Use get_field() to extract individual fields:
enrichment: - type: geo_database format: mmdb path: /data/GeoLite2-City.mmdbtransform: | SELECT *, get_field(geo, 'city') AS city, get_field(geo, 'country_code') AS country FROM (SELECT *, geo_lookup(source_ip) AS geo FROM logs)Examples
Section titled “Examples”Filter by severity
Section titled “Filter by severity”transform: "SELECT * FROM logs WHERE level IN ('ERROR', 'WARN')"Drop fields
Section titled “Drop fields”transform: "SELECT * EXCEPT (request_id, trace_id) FROM logs"Rename fields
Section titled “Rename fields”transform: "SELECT duration_ms AS latency_ms, * EXCEPT (duration_ms) FROM logs"Computed fields
Section titled “Computed fields”transform: | SELECT *, CASE WHEN duration_ms > 200 THEN 'slow' ELSE 'fast' END AS speed FROM logsAggregation
Section titled “Aggregation”transform: | SELECT level, COUNT(*) as count, AVG(duration_ms) as avg_latency FROM logs GROUP BY levelWhat’s next
Section titled “What’s next”| Topic | Where to go |
|---|---|
| See all YAML options | YAML Reference |
| Understand the scanner | Scanner Deep Dive (interactive) |
| Deploy to production | Kubernetes DaemonSet |
| Debug transform issues | Troubleshooting |