Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

SQL Transforms

logfwd uses Apache DataFusion to run SQL queries on your log data. Every log line becomes a row in a virtual logs table.

Column naming

The JSON scanner creates columns using bare field names with native Arrow types:

  • level — Utf8View (string)
  • status — Int64 (integer)
  • latency_ms — Float64 (float)

Use bare names directly in SQL:

SELECT * FROM logs WHERE level = 'ERROR'

When a field has mixed types across rows (e.g., status is sometimes an int, sometimes a string), the builder emits a StructArray conflict column (status: Struct { int: Int64, str: Utf8View }). Before SQL execution, conflict columns are normalized to flat Utf8 columns via COALESCE(CAST(int AS Utf8), CAST(float AS Utf8), str). Use int(status) or float(status) for numeric operations on these columns.

Custom UDFs

logfwd registers the following custom scalar functions in addition to DataFusion’s built-in functions.

json(column, key) — extract a string value from JSON

json(column: Utf8, key: Utf8) → Utf8

Extracts a field from a raw JSON string column and returns its value as a string. Integer and float values are coerced to their string representation. Returns NULL when the key is not present.

SELECT json(_raw, 'host') AS host FROM logs

json_int(column, key) — extract an integer value from JSON

json_int(column: Utf8, key: Utf8) → Int64

Extracts a field from a raw JSON string column and returns its value as a 64-bit integer. Returns NULL when the key is not present or the value is not a JSON number (e.g. a quoted string "200" returns NULL).

SELECT json_int(_raw, 'status') AS status FROM logs
  WHERE json_int(_raw, 'status') >= 500

json_float(column, key) — extract a float value from JSON

json_float(column: Utf8, key: Utf8) → Float64

Extracts a field from a raw JSON string column and returns its value as a 64-bit float. Integer values are promoted to float. Returns NULL when the key is not present or the value is a quoted string.

SELECT json_float(_raw, 'duration') AS duration_sec FROM logs
  WHERE json_float(_raw, 'duration') > 1.5

grok(column, pattern) — grok pattern extraction

grok(column: Utf8, pattern: Utf8) → Struct

Applies a Logstash-style grok pattern to each row and returns a struct with one Utf8 field per named capture (%{PATTERN:name}). Non-matching rows produce NULL struct fields.

Built-in patterns: IP, IPV4, IPV6, NUMBER, INT, BASE10NUM, WORD, NOTSPACE, SPACE, DATA, GREEDYDATA, QUOTEDSTRING, UUID, MAC, URIPATH, URIPATHPARAM, URI, TIMESTAMP_ISO8601, DATE, TIME, LOGLEVEL, HOSTNAME, EMAILADDRESS.

SELECT grok(message, '%{WORD:method} %{URIPATH:path} %{NUMBER:status}')
  FROM logs

regexp_extract(column, pattern, group) — regex capture group extraction

regexp_extract(column: Utf8, pattern: Utf8, group: Int64) → Utf8

Returns the capture group at the given index (1-based), or the full match when group is 0. Returns NULL when the pattern does not match or the group index is out of range.

SELECT regexp_extract(message, 'status=(\d+)', 1) AS status_code FROM logs

geo_lookup(ip) — GeoIP enrichment

geo_lookup(ip: Utf8) → Struct

Looks up an IP address in the configured MaxMind database and returns a struct with fields: country_code (Utf8), country_name (Utf8), city (Utf8), region (Utf8), latitude (Float64), longitude (Float64), asn (Int64), org (Utf8). All fields are nullable — private/unresolvable IPs return NULL. Requires a geo_database enrichment in the pipeline config.

Use get_field() to extract individual fields:

enrichment:
  - type: geo_database
    path: /data/GeoLite2-City.mmdb
transform: |
  SELECT *, get_field(geo, 'city') AS city, get_field(geo, 'country_code') AS country
  FROM (SELECT *, geo_lookup(source_ip) AS geo FROM logs)

Examples

Filter by severity

transform: "SELECT * FROM logs WHERE level IN ('ERROR', 'WARN')"

Drop fields

transform: "SELECT * EXCEPT (request_id, trace_id) FROM logs"

Rename fields

transform: "SELECT duration_ms AS latency_ms, * EXCEPT (duration_ms) FROM logs"

Computed fields

transform: |
  SELECT *,
    CASE WHEN duration_ms > 200 THEN 'slow' ELSE 'fast' END AS speed
  FROM logs

Aggregation

transform: |
  SELECT level, COUNT(*) as count, AVG(duration_ms) as avg_latency
  FROM logs
  GROUP BY level