Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

logfwd

A high-performance log forwarder built in Rust. Tails log files, parses JSON and Kubernetes CRI format with portable SIMD, transforms every batch with DataFusion SQL, and ships to OTLP, Elasticsearch, Loki, HTTP, or stdout.

log files → SIMD parse → Arrow RecordBatch → SQL transform → output

logfwd is a single static binary with no runtime dependencies. Point it at log files, write a SQL query to filter and reshape the data, and forward the results to any OTLP-compatible collector — or directly to Elasticsearch, Loki, or stdout. SQL transforms are the core idea: instead of learning a vendor-specific DSL, you write standard SQL to control exactly what gets shipped.

Get started

  1. Installation — Binary download, Docker, or build from source
  2. Quick Start — Working pipeline in 60 seconds, no external dependencies
  3. Your First Pipeline — Production config with monitoring and validation

Configure

Deploy

Learn more

Installation

Binary download

Download the latest release from GitHub Releases. Binaries are available for:

PlatformArtifact
Linux x86_64logfwd-linux-amd64
Linux ARM64logfwd-linux-arm64
macOS x86_64 (Intel)logfwd-darwin-amd64
macOS ARM64 (Apple Silicon)logfwd-darwin-arm64
# Example: Linux x86_64
curl -fsSL https://github.com/strawgate/memagent/releases/latest/download/logfwd-linux-amd64 -o logfwd
chmod +x logfwd
sudo mv logfwd /usr/local/bin/

Docker

docker run --rm \
  -v /var/log:/var/log:ro \
  -v $(pwd)/config.yaml:/etc/logfwd/config.yaml:ro \
  ghcr.io/strawgate/memagent:latest --config /etc/logfwd/config.yaml

Images are published to ghcr.io/strawgate/memagent for linux/amd64 and linux/arm64. See Docker deployment for compose files and volume configuration.

From source

Requires the Rust stable toolchain (1.85+).

git clone https://github.com/strawgate/memagent.git
cd memagent
cargo build --release -p logfwd

The binary is at ./target/release/logfwd. Copy it wherever you need:

sudo cp target/release/logfwd /usr/local/bin/

Kubernetes

A ready-to-apply DaemonSet manifest is included in the repo:

kubectl apply -f deploy/daemonset.yml

See the Kubernetes deployment guide for resource sizing and collector integration.

Verify installation

logfwd --version
logfwd --help

Next step

Head to the Quick Start to run your first pipeline.

Quick Start

Get a working logfwd pipeline in 60 seconds, then build on it.

Prerequisites

You need the logfwd binary. See Installation for all options, or grab it quickly:

# Download the latest release (macOS Apple Silicon shown)
curl -fsSL https://github.com/strawgate/memagent/releases/latest/download/logfwd-darwin-arm64 -o logfwd
chmod +x logfwd

# Or build from source (Rust 1.86+)
cargo build --release -p logfwd && cp target/release/logfwd .

Verify it works:

./logfwd --version

Stage 1: See log output

Generate synthetic JSON log lines and print them to your terminal.

Generate test data:

./logfwd --generate-json 10000 logs.json

This creates 10,000 JSON log lines with fields like level, message, status, duration_ms, and service.

Create a config file:

# config.yaml
input:
  type: file
  path: logs.json
  format: json

output:
  type: stdout
  format: console

Run it:

./logfwd --config config.yaml

You’ll see a startup banner followed by colored output for every log line:

logfwd v0.1.0

  ✓  default
     in   file  logs.json
     out  stdout

ready · 1 pipeline

10:30:00.000Z  INFO   request handled GET /api/v1/users/10000  duration_ms=1 request_id=... service=myapp status=200
10:30:00.000Z  WARN   request handled POST /api/v2/orders/10015  duration_ms=87 request_id=... service=myapp status=429
10:30:00.000Z  ERROR  request handled GET /health/10021  duration_ms=40 request_id=... service=myapp status=503
10:30:00.000Z  DEBUG  request handled GET /api/v1/users/10033  duration_ms=3 request_id=... service=myapp status=200
...

logfwd parsed every JSON line, detected field types automatically (strings, integers, floats), built Arrow RecordBatches, and printed them in a human-readable format. All 10,000 lines stream through in under a second.

Note: logfwd exits when it reaches the end of a finite file. In production you’d point it at a log file that’s actively being appended to, and logfwd will tail it continuously — like tail -f, but with parsing and SQL.


Stage 2: Filter with SQL

Now add a SQL transform to keep only what matters. This is the core reason to use logfwd — every batch of parsed records becomes a DataFusion SQL table named logs.

First, regenerate the test data — logfwd tracks file positions between runs so it doesn’t reprocess data it has already seen:

./logfwd --generate-json 10000 logs.json

Update your config:

# config.yaml
input:
  type: file
  path: logs.json
  format: json

transform: |
  SELECT level, message, status, duration_ms
  FROM logs
  WHERE level = 'ERROR' AND duration_ms > 50

output:
  type: stdout
  format: console

Run it:

./logfwd --config config.yaml

Now you see far fewer lines — only errors with slow durations:

ERROR  request handled GET /api/v2/products/10049  status=500 duration_ms=92
ERROR  request handled POST /api/v1/orders/10121  status=503 duration_ms=78
...

Only the four columns from the SELECT appear — level, message, status, duration_ms. Everything else was filtered out before it reached the output. In production, this means you only ship the logs you care about — saving bandwidth, storage, and money.

Try a more advanced transform:

transform: |
  SELECT
    level,
    regexp_extract(message, '(GET|POST|PUT|DELETE) (\S+)', 2) AS path,
    status,
    duration_ms
  FROM logs
  WHERE status >= 400

This extracts the URL path from the message with a regex and keeps only 4xx/5xx responses. Full SQL — JOIN, GROUP BY, HAVING, subqueries — all works.

Note: ORDER BY is valid SQL and works correctly within each batch. However, logfwd processes data in streaming batches, so ordering only applies within a single batch, not globally across all data.

Built-in UDFs: int(), float(), regexp_extract(), grok(), json(), json_int(), json_float(). The geo_lookup() UDF is also available when a geo-IP database is configured. See the SQL Transforms guide for the complete reference.


Stage 3: Ship to a collector

In production, logfwd sends OTLP protobuf to an OpenTelemetry Collector, Grafana Alloy, or any OTLP-compatible backend. Let’s try it with logfwd’s built-in blackhole receiver — it accepts OTLP data and discards it, so you can test the full pipeline without external infrastructure.

Start the blackhole receiver:

./logfwd --blackhole &
# logfwd blackhole starting on 127.0.0.1:4318

Regenerate test data and update your config to send OTLP:

./logfwd --generate-json 10000 logs.json
# config.yaml
input:
  type: file
  path: logs.json
  format: json

transform: |
  SELECT level, message, status, duration_ms
  FROM logs
  WHERE level IN ('ERROR', 'WARN')

output:
  type: otlp
  endpoint: http://127.0.0.1:4318/v1/logs
  compression: zstd

Run it:

./logfwd --config config.yaml

logfwd parses the JSON, runs the SQL filter, encodes matching records as OTLP protobuf with zstd compression, and ships them over HTTP. The blackhole receiver accepts everything.

To ship to a real collector, replace the endpoint:

output:
  type: otlp
  endpoint: http://otel-collector:4318/v1/logs   # your real collector
  compression: zstd

logfwd works out of the box with:


Validate before deploying

Before running in production, verify your config. --validate catches YAML errors; --dry-run goes further and compiles the SQL against the Arrow schema, catching column name typos and type mismatches before any data flows.

./logfwd --config config.yaml --validate
#   ready: default
# config ok: 1 pipeline(s)

./logfwd --config config.yaml --dry-run
#   ready: default
# dry run ok: 1 pipeline(s)

What’s next

GuideWhat you’ll learn
Your First PipelineProduction config with monitoring, multi-pipeline setup, CRI format
SQL TransformsFull SQL reference — JOINs, UDFs, enrichment tables, column naming
Configuration ReferenceEvery YAML field, input/output type, and option
Kubernetes DeploymentDaemonSet, resource sizing, OTLP collector integration
TroubleshootingCommon errors, debug mode, diagnostics API

Your First Pipeline

This guide walks through setting up logfwd to tail application logs, filter by severity, and forward to an OpenTelemetry collector.

The config

input:
  type: file
  path: /var/log/app/*.log
  format: json

transform: |
  SELECT * FROM logs WHERE level IN ('ERROR', 'WARN')

output:
  type: otlp
  endpoint: https://otel-collector:4318
  protocol: http
  compression: zstd

server:
  diagnostics: 0.0.0.0:9090

Run it

logfwd --config pipeline.yaml

Monitor it

# Health check
curl http://localhost:9090/health

# Aggregate stats (JSON)
curl http://localhost:9090/api/stats | jq .

# Pipeline details (JSON)
curl http://localhost:9090/api/pipelines | jq .

Validate config without running

logfwd --config pipeline.yaml --validate
# config ok: 1 pipeline(s)

logfwd --config pipeline.yaml --dry-run
# logfwd v0.1.0
#   pipeline default: 1 input(s) -> SELECT * FROM logs WHERE ... -> 1 output(s)
#   ready: default
# dry run ok: 1 pipeline(s) constructed

Configuration Reference

logfwd is configured with a YAML file passed via --config <path>.

Overview

logfwd supports two layout styles:

  • Simple — single pipeline with top-level input, transform, and output keys.
  • Advanced — multiple named pipelines under a pipelines map.

Environment variables are expanded using ${VAR} syntax anywhere in the file. If a variable is not set the placeholder is left as-is.


Simple layout

input:
  type: file
  path: /var/log/app/*.log
  format: json

transform: SELECT level_str, message_str, status_int FROM logs WHERE status_int >= 400

output:
  type: otlp
  endpoint: otel-collector:4317
  compression: zstd

server:
  diagnostics: 0.0.0.0:9090
  log_level: info

Advanced layout

pipelines:
  errors:
    inputs:
      - name: pod_logs
        type: file
        path: /var/log/pods/**/*.log
        format: cri
    transform: SELECT * FROM logs WHERE level_str = 'ERROR'
    outputs:
      - type: otlp
        endpoint: otel-collector:4317

  debug:
    inputs:
      - type: file
        path: /var/log/pods/**/*.log
        format: cri
    outputs:
      - type: stdout
        format: json

server:
  diagnostics: 0.0.0.0:9090

The two layouts cannot be mixed: specifying both input/output at the top level and a pipelines map is a validation error.


Input configuration

Each pipeline requires at least one input. Use a single mapping for one input or a YAML sequence for multiple inputs.

Common fields

FieldTypeRequiredDescription
typestringYesInput type. See Input types.
namestringNoFriendly name shown in diagnostics.
formatstringNoLog format. See Formats. Defaults to auto.

file input

Tail one or more log files that match a glob pattern.

FieldTypeRequiredDescription
pathstringYesGlob pattern, e.g. /var/log/pods/**/*.log.
input:
  type: file
  path: /var/log/pods/**/*.log
  format: cri

udp input (not yet implemented)

Listen for log lines on a UDP socket.

FieldTypeRequiredDescription
listenstringYeshost:port, e.g. 0.0.0.0:514.
input:
  type: udp
  listen: 0.0.0.0:514
  format: syslog

tcp input (not yet implemented)

Accept log lines on a TCP socket.

FieldTypeRequiredDescription
listenstringYeshost:port, e.g. 0.0.0.0:5140.
input:
  type: tcp
  listen: 0.0.0.0:5140
  format: json

otlp input (not yet implemented)

Receive OTLP log records from another agent or SDK.

No extra fields required; the listen address will be configurable in a future release.


Input types

ValueStatusDescription
fileImplementedTail files matching a glob pattern.
udpPlannedReceive log lines over UDP.
tcpPlannedAccept log lines over TCP.
otlpPlannedReceive OTLP logs.

Formats

The format field controls how raw bytes from the input are parsed into log records.

ValueDescription
autoAuto-detect (default). Tries CRI first, then JSON, then raw.
criCRI container log format (<timestamp> <stream> <flags> <message>). Multi-line log reassembly via the P partial flag is supported.
jsonNewline-delimited JSON. Each line must be a single JSON object.
rawTreat each line as an opaque string stored in _raw_str.
logfmtKey=value pairs (e.g. level=info msg="hello"). Not yet implemented.
syslogRFC 5424 syslog. Not yet implemented.
consoleHuman-readable coloured output for interactive debugging. Output mode only.

Output configuration

Each pipeline requires at least one output.

Common fields

FieldTypeRequiredDescription
typestringYesOutput type. See Output types.
namestringNoFriendly name shown in diagnostics.

otlp output

Send log records as OTLP protobuf to an OpenTelemetry collector.

FieldTypeRequiredDefaultDescription
endpointstringYesCollector address, e.g. otel-collector:4317 (gRPC) or http://otel-collector:4318 (HTTP).
protocolstringNohttphttp or grpc.
compressionstringNononezstd to compress the request body.
output:
  type: otlp
  endpoint: otel-collector:4317
  protocol: grpc
  compression: zstd

http output

POST log records as newline-delimited JSON to an HTTP endpoint.

FieldTypeRequiredDescription
endpointstringYesFull URL, e.g. http://ingest.example.com/logs.
compressionstringNozstd to compress the request body.
output:
  type: http
  endpoint: http://ingest.example.com/logs
  compression: zstd

stdout output

Print records to standard output for local debugging.

FieldTypeRequiredDefaultDescription
formatstringNojsonjson (newline-delimited JSON) or console (coloured text).
output:
  type: stdout
  format: console

elasticsearch output (stub)

Ship to Elasticsearch via the bulk API. Not yet functional.

FieldTypeRequiredDescription
endpointstringYesElasticsearch base URL.

loki output (stub)

Push to Grafana Loki. Not yet functional.

FieldTypeRequiredDescription
endpointstringYesLoki push URL.

file_out output (partial)

Write records to a file.

FieldTypeRequiredDescription
pathstringYesDestination file path.

parquet output (stub)

Write records to Parquet files. Not yet functional.

FieldTypeRequiredDescription
pathstringYesDestination file path.

Output types

ValueStatusDescription
otlpImplementedOTLP protobuf over HTTP or gRPC.
httpImplementedJSON lines over HTTP POST.
stdoutImplementedPrint to stdout (JSON or coloured text).
elasticsearchStubElasticsearch bulk API.
lokiStubGrafana Loki push API.
file_outPartialWrite to a file.
parquetStubWrite Parquet files.

SQL transform

The optional transform field contains a DataFusion SQL query that is applied to every Arrow RecordBatch produced by the scanner. The source table is always named logs.

transform: SELECT level_str, message_str, status_int FROM logs WHERE status_int >= 400

Multi-line SQL is supported with YAML block scalars:

transform: |
  SELECT
    level_str,
    message_str,
    regexp_extract(message_str, 'request_id=([a-f0-9-]+)', 1) AS request_id_str,
    status_int
  FROM logs
  WHERE level_str IN ('ERROR', 'WARN')
    AND status_int >= 400

Column naming convention

The scanner maps each JSON field to one or more typed Arrow columns following the {field}_{type} naming convention:

JSON value typeArrow column typeColumn name patternExample
StringStringArray{field}_strlevel_str
IntegerInt64Array{field}_intstatus_int
FloatFloat64Array{field}_floatlatency_ms_float
BooleanStringArray ("true"/"false"){field}_strenabled_str
Nullnull in all type columns
Object / ArrayStringArray (raw JSON){field}_strmetadata_str

When a field contains mixed types across rows, separate columns are emitted: status_int and status_str can coexist in the same batch.

Special columns added by the scanner:

ColumnTypeDescription
_file_strstringAbsolute path of the source file (file inputs only).
_raw_strstringOriginal JSON line (only when keep_raw: true).
_time_ns_intint64Timestamp from CRI header in nanoseconds (CRI inputs only).
_stream_strstringCRI stream name (stdout/stderr).

Built-in UDFs

FunctionSignatureDescription
int(expr)int(any) → int64Cast any value to int64. Returns NULL on failure.
float(expr)float(any) → float64Cast any value to float64. Returns NULL on failure.
grok(pattern, input)grok(utf8, utf8) → utf8Apply a Grok pattern to input and return the first capture as JSON.
regexp_extract(input, pattern, group)regexp_extract(utf8, utf8, int64) → utf8Return capture group group from a regex match.

Examples:

-- Cast a string column to int
SELECT int(status_str) AS status_int FROM logs

-- Extract a field with Grok
SELECT grok('%{IP:client} %{WORD:method} %{URIPATHPARAM:path}', message_str) AS parsed_str FROM logs

-- Extract a named group with regex
SELECT regexp_extract(message_str, 'user=([a-z]+)', 1) AS user_str FROM logs

-- Type-cast from environment-injected string
SELECT float(duration_str) AS duration_ms_float FROM logs

Enrichment tables

Enrichment tables are made available as SQL tables that can be joined in the transform query. They are declared under the top-level enrichment key.

enrichment:
  k8s:
    type: k8s_path

  host:
    type: host_info

  labels:
    type: static
    fields:
      environment: production
      region: us-east-1

k8s_path enrichment

Parses Kubernetes pod log paths (e.g. /var/log/pods/<namespace>_<pod>_<uid>/<container>/) to extract metadata.

SELECT l.level_str, l.message_str, k.namespace, k.pod_name, k.container_name
FROM logs l
JOIN k8s k ON l._file_str = k.log_path_prefix

Columns exposed by k8s:

ColumnDescription
log_path_prefixDirectory prefix used as join key.
namespaceKubernetes namespace.
pod_namePod name.
pod_uidPod UID.
container_nameContainer name.

host_info enrichment

Exposes the hostname of the machine running logfwd.

ColumnDescription
hostnameSystem hostname.

static enrichment

A table with one row containing user-defined label columns.

enrichment:
  labels:
    type: static
    fields:
      environment: production
      cluster: us-east-1
      tier: backend
SELECT l.*, lbl.environment, lbl.cluster
FROM logs l CROSS JOIN labels lbl

Server configuration

The optional server block controls the diagnostics server and observability settings.

FieldTypeDefaultDescription
diagnosticsstringnonehost:port to listen for HTTP diagnostics. See Diagnostics API.
log_levelstringinfoLog verbosity. One of error, warn, info, debug, trace.
metrics_endpointstringnoneOTLP endpoint for periodic metrics push, e.g. http://otel-collector:4318.
metrics_interval_secsinteger60Push interval for OTLP metrics in seconds.
server:
  diagnostics: 0.0.0.0:9090
  log_level: info
  metrics_endpoint: http://otel-collector:4318
  metrics_interval_secs: 30

Diagnostics API

When server.diagnostics is configured, logfwd exposes an HTTP API for monitoring and troubleshooting.

RouteMethodDescription
/GETDashboard HTML (visual explorer for metrics and traces).
/healthGETLiveness probe. Returns 200 OK if the server is running.
/readyGETReadiness probe. Returns 200 OK once pipelines are initialized.
/api/pipelinesGETPer-pipeline counters (lines, bytes, errors, batches, stage timing).
/api/statsGETAggregate process stats (uptime, RSS, CPU, aggregate line counts).
/api/configGETCurrently loaded YAML configuration and its file path.
/api/logsGETRecent log lines from logfwd’s own stderr (ring buffer).
/api/historyGETTime-series data (1-hour window) for dashboard charts.
/api/tracesGETRecent batch processing spans for detailed latency analysis.

Note: The /metrics (Prometheus) endpoint was removed in favor of /api/pipelines. It returns 410 Gone. The /api/system route mentioned in some older documentation does not exist.


Storage configuration

The optional storage block controls where logfwd persists state (checkpoints, disk queue).

FieldTypeDefaultDescription
data_dirstringnoneDirectory for state files. Created if it does not exist.
storage:
  data_dir: /var/lib/logfwd

Environment variable substitution

Any value in the config file can reference an environment variable with ${VAR}:

output:
  type: otlp
  endpoint: ${OTEL_COLLECTOR_ADDR}

server:
  metrics_endpoint: ${METRICS_PUSH_URL}

If the variable is not set, the placeholder is left as-is (no error).


Complete example

pipelines:
  app:
    inputs:
      - name: pod_logs
        type: file
        path: /var/log/pods/**/*.log
        format: cri
    transform: |
      SELECT
        l.level_str,
        l.message_str,
        l.status_int,
        k.namespace,
        k.pod_name,
        k.container_name,
        lbl.environment
      FROM logs l
      LEFT JOIN k8s k ON l._file_str = k.log_path_prefix
      CROSS JOIN labels lbl
      WHERE l.level_str IN ('ERROR', 'WARN')
        OR l.status_int >= 500
    outputs:
      - name: collector
        type: otlp
        endpoint: ${OTEL_ENDPOINT}
        protocol: grpc
        compression: zstd
      - name: debug
        type: stdout
        format: console

enrichment:
  k8s:
    type: k8s_path
  labels:
    type: static
    fields:
      environment: ${ENVIRONMENT}
      cluster: ${CLUSTER_NAME}

server:
  diagnostics: 0.0.0.0:9090
  log_level: info
  metrics_endpoint: ${OTEL_ENDPOINT}
  metrics_interval_secs: 60

storage:
  data_dir: /var/lib/logfwd

Input Types

File

Tail one or more log files with glob pattern support.

input:
  type: file
  path: /var/log/pods/**/*.log
  format: cri      # cri | json | raw | auto
  • Glob re-scanning: New files matching the pattern are discovered automatically (every 5s).
  • Rotation handling: Detects file rotation (rename + create) and switches to the new file. Drains remaining data from the old file before switching.
  • Formats: CRI (Kubernetes container runtime), JSON (newline-delimited), raw (plain text, each line becomes {"_raw": "..."})

Generator

Emit synthetic JSON log lines for benchmarking and pipeline testing. No external data source is required.

FieldTypeRequiredDefaultDescription
listenstringNo(unlimited)Target events per second, e.g. "50000". Omit for maximum throughput.
input:
  type: generator
  listen: "50000"   # ~50,000 events/sec; omit for unlimited

Use --generate-json <n> <file> on the CLI to write a fixed number of lines to a file instead.

UDP

Receive log lines on a UDP socket.

input:
  type: udp
  listen: 0.0.0.0:514
  format: syslog

TCP

Accept log lines on a TCP socket.

input:
  type: tcp
  listen: 0.0.0.0:5140
  format: json

OTLP

Receive OTLP log records from another agent or SDK.

input:
  type: otlp

Output Types

OTLP

Send logs as OpenTelemetry protobuf over HTTP or gRPC.

output:
  type: otlp
  endpoint: https://collector:4318
  protocol: http       # http | grpc
  compression: zstd    # zstd | none (gzip not yet supported)
  auth:
    bearer_token: "${OTEL_TOKEN}"

HTTP (JSON Lines)

POST newline-delimited JSON to any HTTP endpoint.

output:
  type: http
  endpoint: https://logging-service:9200
  auth:
    headers:
      X-API-Key: "${API_KEY}"

Stdout

Print to stdout for debugging/testing.

output:
  type: stdout
  format: console    # console | json | text
  • console: Colored, human-readable (timestamp, level, message, key=value pairs)
  • json: One JSON object per line (machine-parseable)
  • text: Raw text (uses _raw column if available)

Elasticsearch

Ship logs to Elasticsearch via the Bulk API.

output:
  type: elasticsearch
  endpoint: https://es-cluster:9200
  index: logs             # default: "logs"
  compression: gzip       # gzip | none (optional)
  request_mode: buffered  # buffered | streaming (optional, default buffered)
  • Bulk API: Per-document error handling with automatic retries.
  • Batch splitting: Large payloads are split automatically to stay within Elasticsearch limits.
  • Compression: Optional gzip compression for reduced network usage.
  • Streaming mode: Experimental chunked HTTP request bodies for benchmarking against hosted/serverless clusters.
FieldTypeRequiredDefaultDescription
endpointstringYesElasticsearch URL
indexstringNologsTarget index name
compressionstringNononegzip or none
request_modestringNobufferedbuffered or experimental streaming (streaming currently requires compression: none)

Loki

Push logs to Grafana Loki with automatic label grouping.

output:
  type: loki
  endpoint: http://loki:3100
FieldTypeRequiredDescription
endpointstringYesLoki push URL

Multiple outputs (fan-out)

outputs:
  - name: collector
    type: otlp
    endpoint: https://collector:4318
  - name: debug
    type: stdout
    format: console

SQL Transforms

logfwd uses Apache DataFusion to run SQL queries on your log data. Every log line becomes a row in a virtual logs table.

Column naming

The JSON scanner creates columns using bare field names with native Arrow types:

  • level — Utf8View (string)
  • status — Int64 (integer)
  • latency_ms — Float64 (float)

Use bare names directly in SQL:

SELECT * FROM logs WHERE level = 'ERROR'

When a field has mixed types across rows (e.g., status is sometimes an int, sometimes a string), the builder emits a StructArray conflict column (status: Struct { int: Int64, str: Utf8View }). Before SQL execution, conflict columns are normalized to flat Utf8 columns via COALESCE(CAST(int AS Utf8), CAST(float AS Utf8), str). Use int(status) or float(status) for numeric operations on these columns.

Custom UDFs

logfwd registers the following custom scalar functions in addition to DataFusion’s built-in functions.

json(column, key) — extract a string value from JSON

json(column: Utf8, key: Utf8) → Utf8

Extracts a field from a raw JSON string column and returns its value as a string. Integer and float values are coerced to their string representation. Returns NULL when the key is not present.

SELECT json(_raw, 'host') AS host FROM logs

json_int(column, key) — extract an integer value from JSON

json_int(column: Utf8, key: Utf8) → Int64

Extracts a field from a raw JSON string column and returns its value as a 64-bit integer. Returns NULL when the key is not present or the value is not a JSON number (e.g. a quoted string "200" returns NULL).

SELECT json_int(_raw, 'status') AS status FROM logs
  WHERE json_int(_raw, 'status') >= 500

json_float(column, key) — extract a float value from JSON

json_float(column: Utf8, key: Utf8) → Float64

Extracts a field from a raw JSON string column and returns its value as a 64-bit float. Integer values are promoted to float. Returns NULL when the key is not present or the value is a quoted string.

SELECT json_float(_raw, 'duration') AS duration_sec FROM logs
  WHERE json_float(_raw, 'duration') > 1.5

grok(column, pattern) — grok pattern extraction

grok(column: Utf8, pattern: Utf8) → Struct

Applies a Logstash-style grok pattern to each row and returns a struct with one Utf8 field per named capture (%{PATTERN:name}). Non-matching rows produce NULL struct fields.

Built-in patterns: IP, IPV4, IPV6, NUMBER, INT, BASE10NUM, WORD, NOTSPACE, SPACE, DATA, GREEDYDATA, QUOTEDSTRING, UUID, MAC, URIPATH, URIPATHPARAM, URI, TIMESTAMP_ISO8601, DATE, TIME, LOGLEVEL, HOSTNAME, EMAILADDRESS.

SELECT grok(message, '%{WORD:method} %{URIPATH:path} %{NUMBER:status}')
  FROM logs

regexp_extract(column, pattern, group) — regex capture group extraction

regexp_extract(column: Utf8, pattern: Utf8, group: Int64) → Utf8

Returns the capture group at the given index (1-based), or the full match when group is 0. Returns NULL when the pattern does not match or the group index is out of range.

SELECT regexp_extract(message, 'status=(\d+)', 1) AS status_code FROM logs

geo_lookup(ip) — GeoIP enrichment

geo_lookup(ip: Utf8) → Struct

Looks up an IP address in the configured MaxMind database and returns a struct with fields: country_code (Utf8), country_name (Utf8), city (Utf8), region (Utf8), latitude (Float64), longitude (Float64), asn (Int64), org (Utf8). All fields are nullable — private/unresolvable IPs return NULL. Requires a geo_database enrichment in the pipeline config.

Use get_field() to extract individual fields:

enrichment:
  - type: geo_database
    path: /data/GeoLite2-City.mmdb
transform: |
  SELECT *, get_field(geo, 'city') AS city, get_field(geo, 'country_code') AS country
  FROM (SELECT *, geo_lookup(source_ip) AS geo FROM logs)

Examples

Filter by severity

transform: "SELECT * FROM logs WHERE level IN ('ERROR', 'WARN')"

Drop fields

transform: "SELECT * EXCEPT (request_id, trace_id) FROM logs"

Rename fields

transform: "SELECT duration_ms AS latency_ms, * EXCEPT (duration_ms) FROM logs"

Computed fields

transform: |
  SELECT *,
    CASE WHEN duration_ms > 200 THEN 'slow' ELSE 'fast' END AS speed
  FROM logs

Aggregation

transform: |
  SELECT level, COUNT(*) as count, AVG(duration_ms) as avg_latency
  FROM logs
  GROUP BY level

Deployment Guide

This guide covers running logfwd in production environments.


Docker — standalone container

Build the image

docker build -t logfwd:latest .

The multi-stage Dockerfile at the repository root compiles a statically-linked release binary and copies it into a minimal debian:bookworm-slim image.

Run with a config file

Create config.yaml:

input:
  type: file
  path: /var/log/app/*.log
  format: json

output:
  type: otlp
  endpoint: otel-collector:4317
  protocol: grpc

server:
  diagnostics: 0.0.0.0:9090

Run the container, mounting the log directory and config:

docker run -d \
  --name logfwd \
  -v /var/log:/var/log:ro \
  -v $(pwd)/config.yaml:/etc/logfwd/config.yaml:ro \
  -p 9090:9090 \
  logfwd:latest \
  --config /etc/logfwd/config.yaml

Environment variable substitution

Pass secrets and environment-specific values via environment variables instead of baking them into the config file:

output:
  type: otlp
  endpoint: ${OTEL_ENDPOINT}
docker run -d \
  -e OTEL_ENDPOINT=otel-collector:4317 \
  -v /var/log:/var/log:ro \
  -v $(pwd)/config.yaml:/etc/logfwd/config.yaml:ro \
  logfwd:latest --config /etc/logfwd/config.yaml

Kubernetes — DaemonSet

A DaemonSet is the recommended way to deploy logfwd in a Kubernetes cluster. Each node runs one logfwd pod that reads container logs from /var/log on the host.

A ready-to-use manifest is provided at deploy/daemonset.yml.

Minimal DaemonSet

apiVersion: v1
kind: Namespace
metadata:
  name: collectors
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: logfwd
  namespace: collectors
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logfwd-config
  namespace: collectors
data:
  config.yaml: |
    input:
      type: file
      path: /var/log/pods/**/*.log
      format: cri

    transform: |
      SELECT
        level,
        message,
        _timestamp,
        _stream
      FROM logs
      WHERE level != 'DEBUG'

    output:
      type: otlp
      endpoint: ${OTEL_ENDPOINT}
      protocol: grpc
      compression: zstd

    server:
      diagnostics: 0.0.0.0:9090
      log_level: info
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: logfwd
  namespace: collectors
  labels:
    app: logfwd
spec:
  selector:
    matchLabels:
      app: logfwd
  template:
    metadata:
      labels:
        app: logfwd
    spec:
      serviceAccountName: logfwd
      tolerations:
        - operator: Exists  # run on all nodes including control-plane
      containers:
        - name: logfwd
          image: logfwd:latest
          imagePullPolicy: IfNotPresent
          args:
            - --config
            - /etc/logfwd/config.yaml
          env:
            - name: OTEL_ENDPOINT
              value: otel-collector.monitoring.svc.cluster.local:4317
          ports:
            - name: diagnostics
              containerPort: 9090
          resources:
            requests:
              cpu: "250m"
              memory: "128Mi"
            limits:
              cpu: "1"
              memory: "512Mi"
          volumeMounts:
            - name: varlog
              mountPath: /var/log
              readOnly: true
            - name: config
              mountPath: /etc/logfwd
              readOnly: true
      volumes:
        - name: varlog
          hostPath:
            path: /var/log
        - name: config
          configMap:
            name: logfwd-config

Apply it:

kubectl apply -f deploy/daemonset.yml
kubectl -n collectors rollout status daemonset/logfwd

Kubernetes metadata enrichment

Use the k8s_path enrichment table to attach namespace, pod, and container labels to every log record:

enrichment:
  k8s:
    type: k8s_path

transform: |
  SELECT
    l.level,
    l.message,
    k.namespace,
    k.pod_name,
    k.container_name
  FROM logs l
  LEFT JOIN k8s k ON l._source_path = k.log_path_prefix

Namespace filtering

To collect logs only from specific namespaces, filter in the transform:

transform: |
  SELECT l.*, k.namespace, k.pod_name, k.container_name
  FROM logs l
  LEFT JOIN k8s k ON l._source_path = k.log_path_prefix
  WHERE k.namespace IN ('production', 'staging')

Scraping the diagnostics endpoint with Prometheus

Expose port 9090 in the pod spec and add a Prometheus scrape annotation:

metadata:
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9090"
    prometheus.io/path: "/api/pipelines"

OTLP collector integration

logfwd sends log records as OTLP protobuf. Any OpenTelemetry-compatible collector can receive them.

OpenTelemetry Collector

Add a otlp receiver to your collector config and enable the logs pipeline:

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

exporters:
  debug:
    verbosity: normal
  otlphttp/loki:
    endpoint: http://loki:3100/otlp

service:
  pipelines:
    logs:
      receivers: [otlp]
      exporters: [debug, otlphttp/loki]

Point logfwd at the collector:

output:
  type: otlp
  endpoint: otel-collector:4317
  protocol: grpc
  compression: zstd

Grafana Alloy / Agent

Grafana Alloy can receive OTLP logs and forward them to Loki or Tempo. Configure an otelcol.receiver.otlp component and connect it to your exporter pipeline.


Resource sizing guidelines

logfwd is designed to process logs on a single CPU core. The pipeline runs as a set of blocking OS threads — one per input plus shared coordinator threads.

Baseline

ScenarioCPUMemory
Quiet node (< 1 MB/s)50 m64 Mi
Typical node (1–10 MB/s)250 m128 Mi
High-throughput node (> 10 MB/s)500 m – 1 CPU256 Mi

Memory breakdown

ComponentTypical size
Arrow RecordBatch (per batch, 8 KB read)~512 Ki
DataFusion query plan~4 Mi
OTLP request buffer~2 Mi
Enrichment tables< 1 Mi
Per-pipeline overhead~16 Mi

Tuning tips

  • Reduce memory: avoid keep_raw: true (disabled by default) — it stores the full JSON line and accounts for up to 65 % of table memory.
  • Reduce CPU: use a WHERE clause in the transform to drop unwanted records early.
  • Multiple pipelines: each pipeline occupies its own thread. Add CPU budget proportionally.

Kubernetes resource example

resources:
  requests:
    cpu: "250m"
    memory: "128Mi"
  limits:
    cpu: "1"
    memory: "512Mi"

Set the limit higher than the request so logfwd can burst during log spikes without being OOM-killed.


Validating before deploy

Use --validate to parse and validate the config without starting the pipeline:

logfwd --config config.yaml --validate

Use --dry-run to build all pipeline objects without starting them (catches errors such as SQL syntax issues):

logfwd --config config.yaml --dry-run

Both commands exit 0 on success and print an error to stderr on failure.

Docker Deployment

Quick start

docker run -v /var/log:/var/log:ro \
  -v ./config.yaml:/etc/logfwd/config.yaml:ro \
  ghcr.io/strawgate/memagent:latest \
  --config /etc/logfwd/config.yaml

Dockerfile

The release workflow builds multi-arch images for linux/amd64 and linux/arm64 using a distroless base image.

See Dockerfile and .github/workflows/release.yml for details.

Monitoring & Diagnostics

Enable the diagnostics server in your config:

server:
  diagnostics: 0.0.0.0:9090

Endpoints

EndpointDescription
GET /healthLiveness probe (uptime, version)
GET /readyReadiness probe (200 once initialized)
GET /api/pipelinesDetailed JSON with per-stage metrics
GET /api/statsFlattened JSON for polling/benchmarks
GET /api/configView active YAML configuration
GET /api/logsView recent log lines from stderr
GET /api/historyTime-series data for dashboard charts
GET /api/tracesDetailed latency spans for recent batches
GET /HTML dashboard

Key metrics

MetricDescription
logfwd_input_lines_totalLines read per input
logfwd_transform_lines_inLines entering SQL transform
logfwd_transform_lines_outLines after filtering
logfwd_stage_seconds_totalTime per stage (scan, transform, output)
logfwd_flush_reason_totalFlush triggers (size vs timeout)

OTLP metrics push

server:
  metrics_endpoint: http://otel-collector:4318
  metrics_interval_secs: 60

Architecture

Data flow

Sources (file / TCP / UDP / OTLP receiver)
    │
    │  each source produces Bytes independently
    ▼
Format Parser (CRI / JSON / Raw)  ─per source─
    │  strips CRI timestamp/stream prefix, accumulates NDJSON
    ▼
Scanner  ─per source─
    │  one pass classifies entire buffer via ChunkIndex
    │  walks structural positions directly into Arrow columns
    │  injects _resource_* columns from source metadata
    ▼
RecordBatch per source
    │
    ├─→ [if queue.mode: pre-transform] write Arrow IPC segment → ack source
    │
    ├─→ register as partitions of `logs` MemTable
    │   (DataFusion concatenates partitions during query)
    ▼
SQL Transform (DataFusion)
    │  user SQL: SELECT, WHERE, GROUP BY + UDFs
    │  enrichment tables available via JOIN
    │  _resource_* columns flow through like any other column
    ▼
Post-transform RecordBatch
    │
    ├─→ [if queue.mode: post-transform] write Arrow IPC segment → ack source
    │
    ▼
Output Sinks (OTLP / JSON lines / HTTP / stdout)
    │  in-memory fan-out via bounded channels
    │  group rows by _resource_* columns for OTLP ResourceLogs

Persistence

Arrow IPC is the universal segment format. Every persistence point writes Arrow IPC File format with atomic seal, and every persistence point can target local disk or object storage (S3/GCS/Azure Blob).

Queue mode is configurable per pipeline:

pipeline:
  queue:
    mode: pre-transform   # or post-transform, or none
    storage: s3://my-bucket/logfwd/pipeline-a/
    max_bytes: 10GB
    max_age: 24h

pre-transform: Source segments are the queue. Each source writes its own Arrow IPC segments after scanning. Outputs replay from source segments and re-run the transform. Raw data is re-queryable with different SQL. One copy of data. Enrichment data may be stale on replay (_resource_* columns are always correct; enrichment table data reflects query-time state, not ingest-time).

post-transform: Shared post-transform queue per pipeline. Transform runs once, result is persisted. All outputs share one queue with independent cursors. Enrichment baked in at ingest time — correct even on replay days later. Can’t re-query with different SQL. Second copy of data (but usually smaller due to filtering).

none (default): Pure in-memory fan-out. Transform runs once, RecordBatches passed to outputs via bounded channels. Batches drop on output failure. Lowest resource usage.

End-to-end ack

Sources must NOT acknowledge to their upstream until persistence confirms durability. The ack chain:

Scanner produces RecordBatch
  → FileWriter writes to .tmp
  → fsync + rename (segment sealed)
  → Ack source: "data through offset X is durable"
  → Source advances checkpoint

Ack latency is scanner + IPC write + fsync, typically 5-15ms. Comparable to Kafka’s acks=all (3-15ms).

File sources don’t need the ack latency to be fast — the log file is already durable. The checkpoint just tracks the file offset.

Network sources see the full ack latency as their response time.

When queue.mode: none: No persistence, no ack-after-write. Source checkpoints advance after in-memory fan-out. Data can be lost on crash.

Output delivery

Real-time path (all modes): Transform runs once per batch cycle, result fans out to all outputs via bounded channels.

Replay (when queue is enabled): If an output falls behind or restarts, it replays from the queue. With pre-transform mode, replay re-runs the transform. With post-transform mode, replay reads transformed segments directly.

Cursor tracking: Each output tracks { segment_id, batch_offset }. Persisted on each ack. On restart, resume from last position.

Retention: TTL (max_segment_age) + size (max_disk_bytes). When either limit is hit, oldest segments are evicted regardless of cursor positions. Outputs pointing at evicted segments see a gap (tracked in segments_dropped metric).

Delivery semantics: At-least-once when queue is enabled. Best-effort when queue is none.

Multi-source pipeline

Each source scans independently and writes its own Arrow IPC segments. At transform time, the pipeline reads the latest segments from each source and registers them as partitions of the logs MemTable. DataFusion concatenates partitions during query execution.

This means:

  • Each source can have a different ScanConfig (different wanted_fields from predicate pushdown)
  • Schema differences between sources are handled by DataFusion’s schema merging (missing columns are null)
  • The StreamingBuilder’s Bytes lifetime is per-source
  • A source with no data in a cycle contributes no partition

Resource metadata as columns

Source identity and resource attributes are carried as _resource_* prefixed columns (e.g., _resource_k8s_pod_name, _resource_k8s_namespace, _resource_service_name). These are injected during scanning based on the source’s configuration.

This design:

  • Survives SQL transforms naturally (they’re just columns)
  • Persists in the Arrow IPC segments (always correct, unlike enrichment)
  • Enables OTLP output to group rows by resource (group-by on _resource_* columns, one ResourceLogs per distinct combination)
  • Uses dictionary encoding for efficiency (same pod name on every row from one source costs ~one entry)
  • Output sinks exclude _resource_* columns from the payload (same pattern as _raw)

Column naming conventions

ColumnPurposeExample
{field}Bare name, native Arrow typemessage (Utf8View), status (Int64)
{field} (conflict)StructArray with typed childrenstatus: Struct { int: Int64, str: Utf8View }
_rawRaw input line (optional)_raw
_timestampCRI timestamp (RFC 3339 string)_timestamp
_streamCRI stream name_stream
_resource_*Source/resource metadata_resource_k8s_pod_name

Type conflicts produce a single StructArray column with typed children, not separate flat columns.

Arrow IPC segment format

Format: Arrow IPC File format with atomic seal.

Write path: FileWriter writes to .tmp file. On seal: finish() (writes footer) → fsync → atomic rename to final path. Readers never see incomplete files.

Segment lifecycle:

  1. Pipeline writes batch(es) to .tmp file via FileWriter
  2. At size threshold (64 MB) or time threshold: seal segment
  3. On startup: delete orphaned .tmp files

Storage abstraction: All persistence writes go through a SegmentStore trait with implementations for local filesystem and object storage (S3/GCS/Azure Blob). The same segment format and code path is used regardless of storage backend.

Deployment model

logfwd scales by running more instances. One binary, one pipeline per instance. S3 is the coordination layer between instances.

Single instance: Source → Scanner → Transform → Output, all in one process. Predicate pushdown works. In-memory fan-out. Simple.

Scaled out: Multiple instances with different configs, sharing S3 paths. Each instance is a full logfwd binary.

Instance A (edge collector):
  file source → scanner → write Arrow IPC to s3://bucket/raw/

Instance B (central processor):
  s3 source (reads s3://bucket/raw/) → transform → output sinks
  # or: → write to s3://bucket/transformed/ for another instance

Instance C (dedicated sender):
  s3 source (reads s3://bucket/transformed/) → output sinks

No custom RPC, no cluster coordination. Arrow IPC on S3 is the universal interface. Any instance can read any other’s segments.

Scanner architecture

The scanner is the performance-critical path. It has two stages:

Stage 1 — Chunk classification (chunk_classify.rs): Process the entire NDJSON buffer in 64-byte blocks. For each block, find all quote and backslash positions, compute an escape-aware real-quote bitmask, and build a string-interior mask. Output: ChunkIndex with pre-computed bitmasks.

Stage 2 — Field extraction (scanner.rs): A scalar state machine walks top-level JSON objects. For each field, it resolves the key to an index (HashMap, once per field per batch) and routes the value to the builder via append_*_by_idx. String scanning uses the pre-computed ChunkIndex for O(1) closing-quote lookup.

The scan loop is generic over the ScanBuilder trait:

  • StreamingBuilder (via Scanner::scan_detached): builds detached StringArray columns. For the persistence path (Arrow IPC segments).
  • StreamingBuilder (via Scanner::scan): stores (row, offset, len) views into a bytes::Bytes buffer. Builds StringViewArray columns with zero copies. 20% faster. For real-time hot path when persistence is disabled.

Async pipeline

The pipeline runs on a tokio multi-thread runtime. Key components:

  • Sources implement async fn run(&mut self, ctx: &mut SourceContext) (Arroyo-style source-owns-loop). File sources wrap FileTailer via spawn_blocking.
  • Scanner runs on spawn_blocking (pure CPU, ~4MB per call).
  • Transform is async fn execute() (DataFusion is natively async).
  • Sinks implement async fn send_batch(). HTTP-based sinks use async reqwest for connection pooling and timeouts.
  • Shutdown via CancellationToken (already implemented).

Crate map

CratePurpose
logfwdBinary. CLI, pipeline orchestration, OTel metrics.
logfwd-coreScanner, builders, parsers, diagnostics, enrichment, OTLP encoder.
logfwd-configYAML config deserialization and validation.
logfwd-transformDataFusion SQL. UDFs: grok(), regexp_extract(), int(), float().
logfwd-outputOutput sinks + async Sink trait. OTLP, JSON lines, HTTP, stdout.
logfwd-benchCriterion benchmarks.

What’s implemented vs not yet

Implemented: file input, CRI/JSON/Raw parsing, zero-copy scanner, two builder backends (StreamingBuilder default), DataFusion SQL transforms (async), custom UDFs (grok, regexp_extract, int, float), enrichment (K8s path, host info, static labels), OTLP output, JSON lines output, stdout output, diagnostics server, OTel metrics, signal handling (SIGINT/SIGTERM via CancellationToken), graceful shutdown, async Sink trait.

Not yet: async pipeline runtime, async Source trait, Arrow IPC persistence (pre/post-transform), SegmentStore abstraction (local + S3/GCS), object storage upload, _resource_* column injection, OTLP resource grouping, output cursor tracking, TCP/UDP/OTLP input, Elasticsearch/Loki/Parquet output, file offset checkpointing, SQL rewriter, S3 source (for scaled-out deployment).

Zero-Copy Scanner

The scanner converts newline-delimited JSON into Apache Arrow RecordBatches using SIMD-accelerated structural classification.

How it works

  1. Stage 1 (SIMD): Classify the entire buffer in one pass. Find all quotes and backslashes using platform-specific SIMD: AVX2/SSE2 on x86_64, NEON on aarch64. This produces 64-bit bitmasks for O(1) string boundary lookups.

  2. Stage 2 (Scalar): Walk the JSON structure using the pre-computed bitmasks. Extract field names and values, detect types (int/float/string), and build Arrow columns directly.

Zero-copy mode

Scanner uses Arrow’s StringViewArray to create 16-byte views into the input buffer. String data is never copied — the original input buffer is shared via reference counting.

Field pushdown

The SQL transform is analyzed before scanning. If the query only references level and message, the scanner skips extracting all other fields. On wide data (20+ fields), this provides 2-3x throughput improvement.

Performance

DatasetFieldsThroughput
Narrow (3 fields)33.4M lines/sec
Simple (6 fields)62.0M lines/sec
Wide (20 fields)20560K lines/sec
Wide (2 fields projected)20→21.4M lines/sec

Performance

Throughput targets

logfwd targets 1M+ lines/sec on a single CPU core with CRI parsing and OTLP encoding.

Benchmark results

StageTime (100K lines)% of total
Scan (JSON→Arrow)21ms57%
Transform (SQL)~0ms~0%
OTLP encode9ms27%
zstd compress6ms16%
Total CPU36ms2.8M lines/sec

Key optimizations

  • SIMD structural indexing: Classifies JSON structure in one vectorized pass
  • Zero-copy StringViewArray: No string copies in the hot path
  • Field pushdown: Scanner skips unused fields based on SQL analysis
  • Persistent zstd context: Compression context reused across batches
  • Connection pooling: HTTP agent reused for output requests
  • Block-in-place output: Overlaps I/O with scanning via tokio

Memory profile

At our default 4MB batch size (~23K lines):

  • Arrow RecordBatch overhead: ~2MB
  • Input buffer: 4MB (shared with StringView columns)
  • Total per batch: ~6MB

For 1M lines (stress test only):

  • Real RSS: ~205MB (not 926MB as get_array_memory_size() reports)
  • StringViewArray shares the input buffer across all string columns

Troubleshooting

This guide helps you diagnose and fix common problems with logfwd.


Common error messages

config validation error: pipeline '...' has no inputs

The named pipeline has an empty inputs list (or no input key in simple layout).

# Wrong — no inputs
pipelines:
  app:
    outputs:
      - type: stdout

# Fixed
pipelines:
  app:
    inputs:
      - type: file
        path: /var/log/app/*.log
    outputs:
      - type: stdout

config validation error: file input requires 'path'

A file input is missing the required path glob.

# Wrong
input:
  type: file

# Fixed
input:
  type: file
  path: /var/log/app/*.log

config validation error: otlp output requires 'endpoint'

An otlp, http, elasticsearch, or loki output is missing the required endpoint field.

# Wrong
output:
  type: otlp

# Fixed
output:
  type: otlp
  endpoint: otel-collector:4317

config validation error: cannot mix top-level input/output with pipelines

You used both the simple layout (top-level input/output) and the advanced layout (pipelines map) in the same file. Choose one.

config YAML error: ...

The YAML is malformed. Common causes:

  • Indentation errors (YAML requires consistent spaces, not tabs).
  • Missing quotes around values that contain special characters like :.
  • Multi-line SQL not using a block scalar (|):
# Wrong — colon in SQL breaks YAML
transform: SELECT level FROM logs WHERE level = 'ERROR'

# Fixed — use block scalar
transform: |
  SELECT level FROM logs WHERE level = 'ERROR'

error sending to OTLP endpoint: connection refused

logfwd cannot reach the configured OTLP collector.

  1. Verify the endpoint address and port are correct.

  2. Check that the collector is running:

    curl -v http://otel-collector:4318/v1/logs
    
  3. If using gRPC (protocol: grpc), ensure port 4317 is open; for HTTP use 4318.

  4. In Kubernetes, verify the service name resolves from within the pod:

    kubectl exec -n collectors <pod> -- nslookup otel-collector
    

error watching path: No such file or directory

The glob pattern in a file input matched no files and the base directory does not exist. Ensure the directory is mounted and the pattern is correct.

error watching path: permission denied

logfwd cannot read the log directory. In Kubernetes, confirm the varlog volume is mounted with readOnly: true at /var/log and that the container user has read access.


Diagnosing dropped or missing data

Step 1 — Check transform filter_drop_rate

Call the /api/pipelines endpoint (see Reading /api/pipelines below). Look at the transform.filter_drop_rate field. A value close to 1.0 means almost all records are being dropped by your WHERE clause.

Example: you intended to keep errors but accidentally wrote a filter that matches nothing:

-- Typo — 'error' vs 'ERROR'
WHERE level = 'error'

Fix: adjust the WHERE clause or remove it temporarily to confirm records are flowing.

Step 2 — Check input and output line counts

In /api/pipelines, compare:

  • inputs[*].lines_total — lines read from source
  • transform.lines_in — lines entering the transform stage
  • transform.lines_out — lines leaving the transform stage
  • outputs[*].lines_total — lines successfully sent

If inputs[*].lines_total is zero, logfwd is not reading any files. Check the glob pattern and confirm new lines are being appended to the files.

If outputs[*].errors is non-zero, there are delivery failures. Check the logs for error sending messages.

Step 3 — Verify file tailing

Use --dry-run to confirm the file input starts without error:

logfwd --config config.yaml --dry-run

Enable debug logging to see file discovery and tail events:

server:
  log_level: debug

Look for log lines like:

[DEBUG] tailing /var/log/pods/app_pod-xyz/app/0.log
[DEBUG] read 4096 bytes from /var/log/pods/app_pod-xyz/app/0.log

If files appear in the log but no records reach the output, the format processing (FramedInput) may be discarding lines. Switch to format: raw temporarily to confirm raw lines are flowing:

input:
  type: file
  path: /var/log/app/*.log
  format: raw

Step 4 — Check for output errors

If outputs[*].errors is increasing, look at stderr for the specific error. Common causes:

SymptomCauseFix
connection refusedCollector is down or unreachableCheck network connectivity and collector health
413 Request Entity Too LargeBatch too largeReduce batch size (future config option)
401 UnauthorizedMissing auth tokenAdd Authorization header support (not yet implemented)
Slow output_s in stage_secondsNetwork latency to collectorUse compression (compression: zstd) or move collector closer

Reading /api/pipelines

Enable the diagnostics server:

server:
  diagnostics: 0.0.0.0:9090

Then query it:

curl -s http://localhost:9090/api/pipelines | jq .

Response schema

{
  "pipelines": [
    {
      "name": "default",
      "inputs": [
        {
          "name": "pod_logs",
          "type": "file",
          "lines_total": 1024000,
          "bytes_total": 204800000,
          "errors": 0
        }
      ],
      "transform": {
        "sql": "SELECT * FROM logs WHERE level = 'ERROR'",
        "lines_in": 1024000,
        "lines_out": 2048,
        "errors": 0,
        "filter_drop_rate": 0.998
      },
      "outputs": [
        {
          "name": "collector",
          "type": "otlp",
          "lines_total": 2048,
          "bytes_total": 512000,
          "errors": 0
        }
      ],
      "batches": {
        "total": 512,
        "avg_rows": 4.0,
        "flush_by_size": 500,
        "flush_by_timeout": 12,
        "dropped_batches_total": 0,
        "scan_errors_total": 0,
        "last_batch_time_ns": 1712160000000000000
      },
      "stage_seconds": {
        "scan": 1.234567,
        "transform": 0.012345,
        "output": 0.456789
      }
    }
  ],
  "system": {
    "uptime_seconds": 3600,
    "version": "0.1.0"
  }
}

Key fields

FieldDescription
inputs[*].lines_totalTotal log lines read from this input since start.
inputs[*].bytes_totalTotal bytes read from this input since start.
inputs[*].errorsTotal read errors (file open failures, etc.).
transform.lines_inLines entering the SQL transform.
transform.lines_outLines produced by the SQL transform after filtering.
transform.filter_drop_rateFraction of input lines dropped: 1 - (lines_out / lines_in).
outputs[*].lines_totalLines successfully delivered to this output.
outputs[*].errorsDelivery errors (network failures, HTTP errors, etc.).
batches.totalTotal Arrow batches processed.
batches.avg_rowsAverage rows per batch.
batches.flush_by_sizeBatches flushed because they reached the row limit.
batches.flush_by_timeoutBatches flushed because the timeout expired.
batches.dropped_batches_totalBatches dropped due to backpressure or errors.
batches.scan_errors_totalScanner errors (malformed input, etc.).
batches.last_batch_time_nsUnix timestamp (ns) of last processed batch.
stage_seconds.scanTotal CPU time spent in the scanner.
stage_seconds.transformTotal CPU time spent in DataFusion SQL.
stage_seconds.outputTotal CPU time spent in the output sink (includes network).
system.uptime_secondsSeconds since logfwd started.

Debug mode / increasing log verbosity

Set log_level in the server block:

server:
  log_level: debug

Available levels (least to most verbose): error, warn, info, debug, trace.

logfwd writes all log output to stderr. Redirect it to a file if needed:

logfwd --config config.yaml 2>logfwd.log

In Kubernetes:

kubectl -n collectors logs daemonset/logfwd

What each level shows

LevelWhat you see
errorFatal errors only.
warnRecoverable problems (e.g. failed delivery, retrying).
infoPipeline start/stop, file discovery, batch flush summaries.
debugPer-file tail events, batch sizes, SQL plan details.
tracePer-record scanner output, individual JSON field extraction.

Warning: trace level generates very large output on busy nodes. Use only for short debugging sessions.


Validating configuration

# Check syntax and field types only (fast)
logfwd --config config.yaml --validate

# Build full pipeline objects including SQL parsing (slower, catches SQL errors)
logfwd --config config.yaml --dry-run

Both commands print a success message or a detailed error to stderr and exit without starting any pipelines.


Checking the SQL transform

Use --dry-run to catch SQL syntax errors:

logfwd --config config.yaml --dry-run
# error: SQL error: Execution error: column "leve" not found

Test your SQL against a sample file using the stdout output:

input:
  type: file
  path: /path/to/sample.log
  format: json

transform: |
  SELECT level, message FROM logs WHERE status >= 500

output:
  type: stdout
  format: console

Run once and inspect the output:

logfwd --config test.yaml

Performance issues

High CPU usage

  1. Check stage_seconds in /api/pipelines. If transform dominates, simplify the SQL query or add indexes via WHERE-clause pushdown.
  2. If scan dominates, check the input volume with inputs[*].bytes_total / system.uptime_seconds. logfwd processes ~1.7 GB/s on a single core; sustained CPU near 100 % on a fast input is expected.

High memory usage

  1. Confirm keep_raw: false (the default). Setting keep_raw: true stores the full JSON line and can double or triple memory consumption.
  2. Check the number of unique field names across your log lines. Each unique field produces at least one Arrow column; very wide schemas use more memory per batch.

Records accumulating / not being shipped

Check outputs[*].errors — if non-zero, delivery is failing and records are being discarded. Enable log_level: warn to see delivery error messages.

Check batches.flush_by_timeout vs batches.flush_by_size. If nearly all flushes are timeout-driven, the input rate is low and latency is bounded by the flush timeout (expected behaviour).

Developing logfwd

Workspace layout

crates/
  logfwd/              Binary crate. CLI, async pipeline orchestration.
  logfwd-core/         Proven kernel. Scanner, parsers, pipeline state machine, OTLP encoding. no_std.
  logfwd-arrow/        Arrow integration. ScanBuilder impls, SIMD backends, RecordBatch builders.
  logfwd-config/       YAML config parsing and validation.
  logfwd-io/           I/O layer. File tailing, TCP/UDP/OTLP inputs, checkpointing, diagnostics.
  logfwd-transform/    DataFusion SQL transforms, UDFs (grok, regexp_extract, geo_lookup).
  logfwd-output/       Output sinks (OTLP, Elasticsearch, Loki, JSON lines, stdout).
  logfwd-bench/        Criterion benchmarks for the scanner pipeline.
  logfwd-competitive-bench/  Comparative benchmarks vs other log agents.
  logfwd-test-utils/   Shared test utilities.
  logfwd-ebpf-proto/   eBPF log capture protocol definitions (experimental).

Build, test, lint, bench, fuzz

just test                    # All tests
just lint                    # fmt + clippy + toml + deny + typos
cargo test -p logfwd-core    # Core crate only (fastest iteration)

RUSTFLAGS="-C target-cpu=native" cargo bench --bench scanner -p logfwd-core

cd crates/logfwd-core && cargo +nightly fuzz run scanner -- -max_total_time=300

Compile caching with sccache

sccache caches Rust compilation artefacts to speed up builds. The project is configured to use it (.cargo/config.toml sets rustc-wrapper = "sccache").

CI and Copilot agents: sccache is installed automatically — no action needed.

Local development: install sccache once:

cargo install sccache --locked

After that, every cargo build / cargo test / just clippy will use the cache automatically via the project’s .cargo/config.toml.

To temporarily disable sccache (e.g. for debugging):

RUSTC_WRAPPER="" cargo build

Local CPU profiling (macOS)

The cpu-profiling feature works locally on macOS, but the shutdown path matters: the profiled logfwd process must receive SIGTERM directly so it can build and write flamegraph.svg before exiting.

The easiest way to run the full File -> OTLP path locally is:

just profile-otlp-local

This recipe:

  • builds logfwd with --features cpu-profiling
  • generates a JSON input file
  • starts a local OTLP blackhole on a fresh port
  • runs logfwd with a file input and OTLP output
  • sends SIGTERM to the real logfwd child process after a short run
  • leaves a temp directory containing config.yaml, logs.json, pipeline.log, blackhole.log, and flamegraph.svg

Useful variants:

just profile-otlp-local 1000000 10

Caveats:

  • Avoid reusing a diagnostics port from another local run; the helper recipe omits diagnostics entirely to keep the profile loop simple.
  • If the cpu-profiling release build fails with No space left on device, run RUSTC_WRAPPER= cargo clean and retry. The profiled release build is large because release keeps debug info for flamegraphs.
  • Killing a wrapper shell is not sufficient; the SIGTERM must reach the actual logfwd process.

Things that will bite you

Hard-won lessons from building the scanner and builder pipeline. See also dev-docs/ARCHITECTURE.md for pipeline data flow.

The deferred builder pattern exists because incremental null-padding is broken

StreamingBuilder collects (row, value) records during scanning and bulk-builds Arrow columns at finish_batch. This seems roundabout — why not write directly to Arrow builders?

Because maintaining column alignment across multiple type builders (str, int, float) per field is a coordination nightmare. When you write an int, you must pad the str and float builders with null. When end_row fires, pad all unwritten fields. When a new field appears mid-batch, back-fill all prior rows. We tried this (IndexedBatchBuilder); proptest found column length mismatches on multi-line NDJSON with varying field sets.

The deferred pattern is correct by construction: each column is built independently. Gaps are nulls. Columns can never mismatch.

Chunk-level SIMD classification beats per-line SIMD

We tried three approaches:

  1. Per-line SIMD: load 16 bytes, compare for " and \. Slower than scalar on short strings.
  2. sonic-rs DOM: SIMD JSON parser builds a DOM per line. The DOM allocation is the bottleneck.
  3. Chunk-level classification (StructuralIndex): one portable SIMD pass (via wide crate) over the entire buffer, detecting 10 structural characters simultaneously. Then scan_string is a single trailing_zeros bit-scan.

Approach 3 wins everywhere because classification is amortized across all strings and per-string lookup is O(1).

The prefix_xor escape detection has a subtle correctness requirement

The simdjson prefix_xor algorithm detects escaped quotes by computing backslash-run parity. It works for consecutive backslashes (\\\" = escaped quote). But for non-consecutive backslashes like \n\", prefix_xor gives wrong results because it counts ALL backslashes, not per-run.

Our implementation iterates each backslash: mark next byte as escaped, skip escaped backslashes. Fast because most JSON has zero or few backslashes. The carry between 64-byte blocks must be handled.

The scanner assumes UTF-8 input

from_utf8_unchecked throughout the scanner and builders. JSON is UTF-8 by spec, so this holds in practice. But the scanner does NOT validate — non-UTF-8 input is UB. The fuzz target guards against this; production code currently doesn’t. See issue #76.

HashMap field lookup was 60% of total scan time

Profiling showed get_or_create_field dominating — SipHash + probe per field per row. Fix: resolve_field does the HashMap lookup once per batch. Subsequent rows use the returned index directly. The ScanBuilder trait’s resolve_field + append_*_by_idx pattern encodes this.

StringViewArray memory reporting is misleading

Arrow’s get_array_memory_size() counts the backing buffer for every column sharing it. If 5 string columns point into the same buffer, reported memory is 5x actual. The StreamingBuilder produces shared-buffer columns; memory reports overcount significantly.

Arrow IPC compression is just a flag

Compressed Arrow IPC is StreamWriter with IpcWriteOptions::try_with_compression(Some(CompressionType::ZSTD)). Any RecordBatch can be compressed. No special builder needed.

keep_raw costs 65% of table memory

The _raw column stores the full JSON line. Larger than all other columns combined. Default is keep_raw: false.

Always use just clippy, never bare cargo clippy

CI runs cargo clippy -- -D warnings (all warnings are errors). Bare cargo clippy only shows warnings, so code that looks clean locally fails in CI. The just clippy recipe matches CI exactly. Additionally, conditional SIMD compilation means warnings differ between aarch64 (macOS) and x86_64 (CI Linux).

proptest finds bugs unit tests can’t

Every time we thought the scanner was correct, proptest broke it. Escapes crossing 64-byte boundaries, fields in different orders, duplicate keys with different types. Run PROPTEST_CASES=2000 minimum.

Oracle tests compare against sonic-rs as ground truth. Our scanner does first-writer-wins for duplicate keys; sonic-rs does last-writer-wins. Both valid per RFC 8259; oracle tests skip duplicate-key inputs.

Two scan modes serve different purposes

  • Scanner::scan_detached: produces self-contained StringArray columns. Input buffer can be freed. For persistence and compression.
  • Scanner::scan: zero-copy StringViewArray views into bytes::Bytes buffer. 20% faster. Buffer must stay alive. For real-time query-then-discard.

Both use the same StreamingBuilder (which implements ScanBuilder), sharing the generic scan_streaming() loop.

Building & Testing

Prerequisites

# Install Rust stable
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# Install task runner
cargo install just

# Install development tools
just install-tools

Common commands

just ci          # Run full CI suite (lint + test)
just fmt         # Format code
just clippy      # Run lints
just test        # Run all tests
just bench       # Run Criterion microbenchmarks
just build       # Build release binary

Project structure

crates/logfwd/           # Binary entry point, CLI, pipeline orchestrator
crates/logfwd-core/      # Scanner, file tailer, CRI parser, diagnostics
crates/logfwd-config/    # YAML config parser
crates/logfwd-output/    # Output sinks (OTLP, HTTP, stdout)
crates/logfwd-transform/ # SQL transforms via DataFusion, UDFs
crates/logfwd-bench/     # Benchmarks (Criterion + exploratory)

Benchmarking

Criterion microbenchmarks

just bench

Measures scanner throughput, OTLP encoding speed, and compression performance.

Competitive benchmarks

Compare logfwd against vector, fluent-bit, filebeat, otelcol, and vlagent:

# Binary mode (local dev)
just bench-competitive --lines 1000000 --scenarios passthrough,json_parse,filter

# Docker mode (resource-limited)
just bench-competitive --lines 5000000 --docker --cpus 1 --memory 1g --markdown

Exploratory profiling

# Stage-by-stage profile
cargo run -p logfwd-bench --release --bin e2e-profile

# Memory analysis
cargo run -p logfwd-bench --release --bin sizes

# Real RSS measurement
cargo run -p logfwd-bench --release --bin rss

Nightly benchmarks

Results are published to GitHub Pages automatically via the nightly benchmark workflow. View at: strawgate.github.io/memagent/bench/