logfwd
A high-performance log forwarder built in Rust. Tails log files, parses JSON and Kubernetes CRI format with portable SIMD, transforms every batch with DataFusion SQL, and ships to OTLP, Elasticsearch, Loki, HTTP, or stdout.
log files → SIMD parse → Arrow RecordBatch → SQL transform → output
logfwd is a single static binary with no runtime dependencies. Point it at log files, write a SQL query to filter and reshape the data, and forward the results to any OTLP-compatible collector — or directly to Elasticsearch, Loki, or stdout. SQL transforms are the core idea: instead of learning a vendor-specific DSL, you write standard SQL to control exactly what gets shipped.
Get started
- Installation — Binary download, Docker, or build from source
- Quick Start — Working pipeline in 60 seconds, no external dependencies
- Your First Pipeline — Production config with monitoring and validation
Configure
- Configuration Reference — All YAML fields, input/output types, enrichment
- Input Types — File, TCP, UDP, OTLP receiver, generator
- Output Types — OTLP, HTTP, Elasticsearch, Loki, stdout
- SQL Transforms — Filter, reshape, extract — full DataFusion SQL
Deploy
- Kubernetes DaemonSet — Manifest, resource sizing, CRI format
- Docker — Container images and compose files
- Monitoring — Diagnostics API, Prometheus metrics, health checks
Learn more
- Pipeline Design — How data flows from input to output
- Scanner — How the parser works
- Performance — Benchmarks and tuning
- Troubleshooting — Common errors and how to fix them
Installation
Binary download
Download the latest release from GitHub Releases. Binaries are available for:
| Platform | Artifact |
|---|---|
| Linux x86_64 | logfwd-linux-amd64 |
| Linux ARM64 | logfwd-linux-arm64 |
| macOS x86_64 (Intel) | logfwd-darwin-amd64 |
| macOS ARM64 (Apple Silicon) | logfwd-darwin-arm64 |
# Example: Linux x86_64
curl -fsSL https://github.com/strawgate/memagent/releases/latest/download/logfwd-linux-amd64 -o logfwd
chmod +x logfwd
sudo mv logfwd /usr/local/bin/
Docker
docker run --rm \
-v /var/log:/var/log:ro \
-v $(pwd)/config.yaml:/etc/logfwd/config.yaml:ro \
ghcr.io/strawgate/memagent:latest --config /etc/logfwd/config.yaml
Images are published to ghcr.io/strawgate/memagent for linux/amd64 and linux/arm64. See Docker deployment for compose files and volume configuration.
From source
Requires the Rust stable toolchain (1.85+).
git clone https://github.com/strawgate/memagent.git
cd memagent
cargo build --release -p logfwd
The binary is at ./target/release/logfwd. Copy it wherever you need:
sudo cp target/release/logfwd /usr/local/bin/
Kubernetes
A ready-to-apply DaemonSet manifest is included in the repo:
kubectl apply -f deploy/daemonset.yml
See the Kubernetes deployment guide for resource sizing and collector integration.
Verify installation
logfwd --version
logfwd --help
Next step
Head to the Quick Start to run your first pipeline.
Quick Start
Get a working logfwd pipeline in 60 seconds, then build on it.
Prerequisites
You need the logfwd binary. See Installation for all options, or grab it quickly:
# Download the latest release (macOS Apple Silicon shown)
curl -fsSL https://github.com/strawgate/memagent/releases/latest/download/logfwd-darwin-arm64 -o logfwd
chmod +x logfwd
# Or build from source (Rust 1.86+)
cargo build --release -p logfwd && cp target/release/logfwd .
Verify it works:
./logfwd --version
Stage 1: See log output
Generate synthetic JSON log lines and print them to your terminal.
Generate test data:
./logfwd --generate-json 10000 logs.json
This creates 10,000 JSON log lines with fields like level, message, status, duration_ms, and service.
Create a config file:
# config.yaml
input:
type: file
path: logs.json
format: json
output:
type: stdout
format: console
Run it:
./logfwd --config config.yaml
You’ll see a startup banner followed by colored output for every log line:
logfwd v0.1.0
✓ default
in file logs.json
out stdout
ready · 1 pipeline
10:30:00.000Z INFO request handled GET /api/v1/users/10000 duration_ms=1 request_id=... service=myapp status=200
10:30:00.000Z WARN request handled POST /api/v2/orders/10015 duration_ms=87 request_id=... service=myapp status=429
10:30:00.000Z ERROR request handled GET /health/10021 duration_ms=40 request_id=... service=myapp status=503
10:30:00.000Z DEBUG request handled GET /api/v1/users/10033 duration_ms=3 request_id=... service=myapp status=200
...
logfwd parsed every JSON line, detected field types automatically (strings, integers, floats), built Arrow RecordBatches, and printed them in a human-readable format. All 10,000 lines stream through in under a second.
Note: logfwd exits when it reaches the end of a finite file. In production you’d point it at a log file that’s actively being appended to, and logfwd will tail it continuously — like
tail -f, but with parsing and SQL.
Stage 2: Filter with SQL
Now add a SQL transform to keep only what matters. This is the core reason to use logfwd — every batch of parsed records becomes a DataFusion SQL table named logs.
First, regenerate the test data — logfwd tracks file positions between runs so it doesn’t reprocess data it has already seen:
./logfwd --generate-json 10000 logs.json
Update your config:
# config.yaml
input:
type: file
path: logs.json
format: json
transform: |
SELECT level, message, status, duration_ms
FROM logs
WHERE level = 'ERROR' AND duration_ms > 50
output:
type: stdout
format: console
Run it:
./logfwd --config config.yaml
Now you see far fewer lines — only errors with slow durations:
ERROR request handled GET /api/v2/products/10049 status=500 duration_ms=92
ERROR request handled POST /api/v1/orders/10121 status=503 duration_ms=78
...
Only the four columns from the SELECT appear — level, message, status, duration_ms. Everything else was filtered out before it reached the output. In production, this means you only ship the logs you care about — saving bandwidth, storage, and money.
Try a more advanced transform:
transform: |
SELECT
level,
regexp_extract(message, '(GET|POST|PUT|DELETE) (\S+)', 2) AS path,
status,
duration_ms
FROM logs
WHERE status >= 400
This extracts the URL path from the message with a regex and keeps only 4xx/5xx responses. Full SQL — JOIN, GROUP BY, HAVING, subqueries — all works.
Note:
ORDER BYis valid SQL and works correctly within each batch. However, logfwd processes data in streaming batches, so ordering only applies within a single batch, not globally across all data.
Built-in UDFs: int(), float(), regexp_extract(), grok(), json(), json_int(), json_float(). The geo_lookup() UDF is also available when a geo-IP database is configured. See the SQL Transforms guide for the complete reference.
Stage 3: Ship to a collector
In production, logfwd sends OTLP protobuf to an OpenTelemetry Collector, Grafana Alloy, or any OTLP-compatible backend. Let’s try it with logfwd’s built-in blackhole receiver — it accepts OTLP data and discards it, so you can test the full pipeline without external infrastructure.
Start the blackhole receiver:
./logfwd --blackhole &
# logfwd blackhole starting on 127.0.0.1:4318
Regenerate test data and update your config to send OTLP:
./logfwd --generate-json 10000 logs.json
# config.yaml
input:
type: file
path: logs.json
format: json
transform: |
SELECT level, message, status, duration_ms
FROM logs
WHERE level IN ('ERROR', 'WARN')
output:
type: otlp
endpoint: http://127.0.0.1:4318/v1/logs
compression: zstd
Run it:
./logfwd --config config.yaml
logfwd parses the JSON, runs the SQL filter, encodes matching records as OTLP protobuf with zstd compression, and ships them over HTTP. The blackhole receiver accepts everything.
To ship to a real collector, replace the endpoint:
output:
type: otlp
endpoint: http://otel-collector:4318/v1/logs # your real collector
compression: zstd
logfwd works out of the box with:
- OpenTelemetry Collector
- Grafana Alloy
- Any backend that speaks OTLP over HTTP or gRPC
Validate before deploying
Before running in production, verify your config. --validate catches YAML errors; --dry-run goes further and compiles the SQL against the Arrow schema, catching column name typos and type mismatches before any data flows.
./logfwd --config config.yaml --validate
# ready: default
# config ok: 1 pipeline(s)
./logfwd --config config.yaml --dry-run
# ready: default
# dry run ok: 1 pipeline(s)
What’s next
| Guide | What you’ll learn |
|---|---|
| Your First Pipeline | Production config with monitoring, multi-pipeline setup, CRI format |
| SQL Transforms | Full SQL reference — JOINs, UDFs, enrichment tables, column naming |
| Configuration Reference | Every YAML field, input/output type, and option |
| Kubernetes Deployment | DaemonSet, resource sizing, OTLP collector integration |
| Troubleshooting | Common errors, debug mode, diagnostics API |
Your First Pipeline
This guide walks through setting up logfwd to tail application logs, filter by severity, and forward to an OpenTelemetry collector.
The config
input:
type: file
path: /var/log/app/*.log
format: json
transform: |
SELECT * FROM logs WHERE level IN ('ERROR', 'WARN')
output:
type: otlp
endpoint: https://otel-collector:4318
protocol: http
compression: zstd
server:
diagnostics: 0.0.0.0:9090
Run it
logfwd --config pipeline.yaml
Monitor it
# Health check
curl http://localhost:9090/health
# Aggregate stats (JSON)
curl http://localhost:9090/api/stats | jq .
# Pipeline details (JSON)
curl http://localhost:9090/api/pipelines | jq .
Validate config without running
logfwd --config pipeline.yaml --validate
# config ok: 1 pipeline(s)
logfwd --config pipeline.yaml --dry-run
# logfwd v0.1.0
# pipeline default: 1 input(s) -> SELECT * FROM logs WHERE ... -> 1 output(s)
# ready: default
# dry run ok: 1 pipeline(s) constructed
Configuration Reference
logfwd is configured with a YAML file passed via --config <path>.
Overview
logfwd supports two layout styles:
- Simple — single pipeline with top-level
input,transform, andoutputkeys. - Advanced — multiple named pipelines under a
pipelinesmap.
Environment variables are expanded using ${VAR} syntax anywhere in the file. If a
variable is not set the placeholder is left as-is.
Simple layout
input:
type: file
path: /var/log/app/*.log
format: json
transform: SELECT level_str, message_str, status_int FROM logs WHERE status_int >= 400
output:
type: otlp
endpoint: otel-collector:4317
compression: zstd
server:
diagnostics: 0.0.0.0:9090
log_level: info
Advanced layout
pipelines:
errors:
inputs:
- name: pod_logs
type: file
path: /var/log/pods/**/*.log
format: cri
transform: SELECT * FROM logs WHERE level_str = 'ERROR'
outputs:
- type: otlp
endpoint: otel-collector:4317
debug:
inputs:
- type: file
path: /var/log/pods/**/*.log
format: cri
outputs:
- type: stdout
format: json
server:
diagnostics: 0.0.0.0:9090
The two layouts cannot be mixed: specifying both input/output at the top level and
a pipelines map is a validation error.
Input configuration
Each pipeline requires at least one input. Use a single mapping for one input or a YAML sequence for multiple inputs.
Common fields
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Input type. See Input types. |
name | string | No | Friendly name shown in diagnostics. |
format | string | No | Log format. See Formats. Defaults to auto. |
file input
Tail one or more log files that match a glob pattern.
| Field | Type | Required | Description |
|---|---|---|---|
path | string | Yes | Glob pattern, e.g. /var/log/pods/**/*.log. |
input:
type: file
path: /var/log/pods/**/*.log
format: cri
udp input (not yet implemented)
Listen for log lines on a UDP socket.
| Field | Type | Required | Description |
|---|---|---|---|
listen | string | Yes | host:port, e.g. 0.0.0.0:514. |
input:
type: udp
listen: 0.0.0.0:514
format: syslog
tcp input (not yet implemented)
Accept log lines on a TCP socket.
| Field | Type | Required | Description |
|---|---|---|---|
listen | string | Yes | host:port, e.g. 0.0.0.0:5140. |
input:
type: tcp
listen: 0.0.0.0:5140
format: json
otlp input (not yet implemented)
Receive OTLP log records from another agent or SDK.
No extra fields required; the listen address will be configurable in a future release.
Input types
| Value | Status | Description |
|---|---|---|
file | Implemented | Tail files matching a glob pattern. |
udp | Planned | Receive log lines over UDP. |
tcp | Planned | Accept log lines over TCP. |
otlp | Planned | Receive OTLP logs. |
Formats
The format field controls how raw bytes from the input are parsed into log records.
| Value | Description |
|---|---|
auto | Auto-detect (default). Tries CRI first, then JSON, then raw. |
cri | CRI container log format (<timestamp> <stream> <flags> <message>). Multi-line log reassembly via the P partial flag is supported. |
json | Newline-delimited JSON. Each line must be a single JSON object. |
raw | Treat each line as an opaque string stored in _raw_str. |
logfmt | Key=value pairs (e.g. level=info msg="hello"). Not yet implemented. |
syslog | RFC 5424 syslog. Not yet implemented. |
console | Human-readable coloured output for interactive debugging. Output mode only. |
Output configuration
Each pipeline requires at least one output.
Common fields
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Output type. See Output types. |
name | string | No | Friendly name shown in diagnostics. |
otlp output
Send log records as OTLP protobuf to an OpenTelemetry collector.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
endpoint | string | Yes | — | Collector address, e.g. otel-collector:4317 (gRPC) or http://otel-collector:4318 (HTTP). |
protocol | string | No | http | http or grpc. |
compression | string | No | none | zstd to compress the request body. |
output:
type: otlp
endpoint: otel-collector:4317
protocol: grpc
compression: zstd
http output
POST log records as newline-delimited JSON to an HTTP endpoint.
| Field | Type | Required | Description |
|---|---|---|---|
endpoint | string | Yes | Full URL, e.g. http://ingest.example.com/logs. |
compression | string | No | zstd to compress the request body. |
output:
type: http
endpoint: http://ingest.example.com/logs
compression: zstd
stdout output
Print records to standard output for local debugging.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
format | string | No | json | json (newline-delimited JSON) or console (coloured text). |
output:
type: stdout
format: console
elasticsearch output (stub)
Ship to Elasticsearch via the bulk API. Not yet functional.
| Field | Type | Required | Description |
|---|---|---|---|
endpoint | string | Yes | Elasticsearch base URL. |
loki output (stub)
Push to Grafana Loki. Not yet functional.
| Field | Type | Required | Description |
|---|---|---|---|
endpoint | string | Yes | Loki push URL. |
file_out output (partial)
Write records to a file.
| Field | Type | Required | Description |
|---|---|---|---|
path | string | Yes | Destination file path. |
parquet output (stub)
Write records to Parquet files. Not yet functional.
| Field | Type | Required | Description |
|---|---|---|---|
path | string | Yes | Destination file path. |
Output types
| Value | Status | Description |
|---|---|---|
otlp | Implemented | OTLP protobuf over HTTP or gRPC. |
http | Implemented | JSON lines over HTTP POST. |
stdout | Implemented | Print to stdout (JSON or coloured text). |
elasticsearch | Stub | Elasticsearch bulk API. |
loki | Stub | Grafana Loki push API. |
file_out | Partial | Write to a file. |
parquet | Stub | Write Parquet files. |
SQL transform
The optional transform field contains a DataFusion SQL query that is applied to every
Arrow RecordBatch produced by the scanner. The source table is always named logs.
transform: SELECT level_str, message_str, status_int FROM logs WHERE status_int >= 400
Multi-line SQL is supported with YAML block scalars:
transform: |
SELECT
level_str,
message_str,
regexp_extract(message_str, 'request_id=([a-f0-9-]+)', 1) AS request_id_str,
status_int
FROM logs
WHERE level_str IN ('ERROR', 'WARN')
AND status_int >= 400
Column naming convention
The scanner maps each JSON field to one or more typed Arrow columns following the
{field}_{type} naming convention:
| JSON value type | Arrow column type | Column name pattern | Example |
|---|---|---|---|
| String | StringArray | {field}_str | level_str |
| Integer | Int64Array | {field}_int | status_int |
| Float | Float64Array | {field}_float | latency_ms_float |
| Boolean | StringArray ("true"/"false") | {field}_str | enabled_str |
| Null | null in all type columns | — | — |
| Object / Array | StringArray (raw JSON) | {field}_str | metadata_str |
When a field contains mixed types across rows, separate columns are emitted:
status_int and status_str can coexist in the same batch.
Special columns added by the scanner:
| Column | Type | Description |
|---|---|---|
_file_str | string | Absolute path of the source file (file inputs only). |
_raw_str | string | Original JSON line (only when keep_raw: true). |
_time_ns_int | int64 | Timestamp from CRI header in nanoseconds (CRI inputs only). |
_stream_str | string | CRI stream name (stdout/stderr). |
Built-in UDFs
| Function | Signature | Description |
|---|---|---|
int(expr) | int(any) → int64 | Cast any value to int64. Returns NULL on failure. |
float(expr) | float(any) → float64 | Cast any value to float64. Returns NULL on failure. |
grok(pattern, input) | grok(utf8, utf8) → utf8 | Apply a Grok pattern to input and return the first capture as JSON. |
regexp_extract(input, pattern, group) | regexp_extract(utf8, utf8, int64) → utf8 | Return capture group group from a regex match. |
Examples:
-- Cast a string column to int
SELECT int(status_str) AS status_int FROM logs
-- Extract a field with Grok
SELECT grok('%{IP:client} %{WORD:method} %{URIPATHPARAM:path}', message_str) AS parsed_str FROM logs
-- Extract a named group with regex
SELECT regexp_extract(message_str, 'user=([a-z]+)', 1) AS user_str FROM logs
-- Type-cast from environment-injected string
SELECT float(duration_str) AS duration_ms_float FROM logs
Enrichment tables
Enrichment tables are made available as SQL tables that can be joined in the transform
query. They are declared under the top-level enrichment key.
enrichment:
k8s:
type: k8s_path
host:
type: host_info
labels:
type: static
fields:
environment: production
region: us-east-1
k8s_path enrichment
Parses Kubernetes pod log paths (e.g.
/var/log/pods/<namespace>_<pod>_<uid>/<container>/) to extract metadata.
SELECT l.level_str, l.message_str, k.namespace, k.pod_name, k.container_name
FROM logs l
JOIN k8s k ON l._file_str = k.log_path_prefix
Columns exposed by k8s:
| Column | Description |
|---|---|
log_path_prefix | Directory prefix used as join key. |
namespace | Kubernetes namespace. |
pod_name | Pod name. |
pod_uid | Pod UID. |
container_name | Container name. |
host_info enrichment
Exposes the hostname of the machine running logfwd.
| Column | Description |
|---|---|
hostname | System hostname. |
static enrichment
A table with one row containing user-defined label columns.
enrichment:
labels:
type: static
fields:
environment: production
cluster: us-east-1
tier: backend
SELECT l.*, lbl.environment, lbl.cluster
FROM logs l CROSS JOIN labels lbl
Server configuration
The optional server block controls the diagnostics server and observability settings.
| Field | Type | Default | Description |
|---|---|---|---|
diagnostics | string | none | host:port to listen for HTTP diagnostics. See Diagnostics API. |
log_level | string | info | Log verbosity. One of error, warn, info, debug, trace. |
metrics_endpoint | string | none | OTLP endpoint for periodic metrics push, e.g. http://otel-collector:4318. |
metrics_interval_secs | integer | 60 | Push interval for OTLP metrics in seconds. |
server:
diagnostics: 0.0.0.0:9090
log_level: info
metrics_endpoint: http://otel-collector:4318
metrics_interval_secs: 30
Diagnostics API
When server.diagnostics is configured, logfwd exposes an HTTP API for monitoring and troubleshooting.
| Route | Method | Description |
|---|---|---|
/ | GET | Dashboard HTML (visual explorer for metrics and traces). |
/health | GET | Liveness probe. Returns 200 OK if the server is running. |
/ready | GET | Readiness probe. Returns 200 OK once pipelines are initialized. |
/api/pipelines | GET | Per-pipeline counters (lines, bytes, errors, batches, stage timing). |
/api/stats | GET | Aggregate process stats (uptime, RSS, CPU, aggregate line counts). |
/api/config | GET | Currently loaded YAML configuration and its file path. |
/api/logs | GET | Recent log lines from logfwd’s own stderr (ring buffer). |
/api/history | GET | Time-series data (1-hour window) for dashboard charts. |
/api/traces | GET | Recent batch processing spans for detailed latency analysis. |
Note: The /metrics (Prometheus) endpoint was removed in favor of /api/pipelines. It returns 410 Gone. The /api/system route mentioned in some older documentation does not exist.
Storage configuration
The optional storage block controls where logfwd persists state (checkpoints, disk
queue).
| Field | Type | Default | Description |
|---|---|---|---|
data_dir | string | none | Directory for state files. Created if it does not exist. |
storage:
data_dir: /var/lib/logfwd
Environment variable substitution
Any value in the config file can reference an environment variable with ${VAR}:
output:
type: otlp
endpoint: ${OTEL_COLLECTOR_ADDR}
server:
metrics_endpoint: ${METRICS_PUSH_URL}
If the variable is not set, the placeholder is left as-is (no error).
Complete example
pipelines:
app:
inputs:
- name: pod_logs
type: file
path: /var/log/pods/**/*.log
format: cri
transform: |
SELECT
l.level_str,
l.message_str,
l.status_int,
k.namespace,
k.pod_name,
k.container_name,
lbl.environment
FROM logs l
LEFT JOIN k8s k ON l._file_str = k.log_path_prefix
CROSS JOIN labels lbl
WHERE l.level_str IN ('ERROR', 'WARN')
OR l.status_int >= 500
outputs:
- name: collector
type: otlp
endpoint: ${OTEL_ENDPOINT}
protocol: grpc
compression: zstd
- name: debug
type: stdout
format: console
enrichment:
k8s:
type: k8s_path
labels:
type: static
fields:
environment: ${ENVIRONMENT}
cluster: ${CLUSTER_NAME}
server:
diagnostics: 0.0.0.0:9090
log_level: info
metrics_endpoint: ${OTEL_ENDPOINT}
metrics_interval_secs: 60
storage:
data_dir: /var/lib/logfwd
Input Types
File
Tail one or more log files with glob pattern support.
input:
type: file
path: /var/log/pods/**/*.log
format: cri # cri | json | raw | auto
- Glob re-scanning: New files matching the pattern are discovered automatically (every 5s).
- Rotation handling: Detects file rotation (rename + create) and switches to the new file. Drains remaining data from the old file before switching.
- Formats: CRI (Kubernetes container runtime), JSON (newline-delimited), raw (plain text, each line becomes
{"_raw": "..."})
Generator
Emit synthetic JSON log lines for benchmarking and pipeline testing. No external data source is required.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
listen | string | No | (unlimited) | Target events per second, e.g. "50000". Omit for maximum throughput. |
input:
type: generator
listen: "50000" # ~50,000 events/sec; omit for unlimited
Use --generate-json <n> <file> on the CLI to write a fixed number of lines to a file instead.
UDP
Receive log lines on a UDP socket.
input:
type: udp
listen: 0.0.0.0:514
format: syslog
TCP
Accept log lines on a TCP socket.
input:
type: tcp
listen: 0.0.0.0:5140
format: json
OTLP
Receive OTLP log records from another agent or SDK.
input:
type: otlp
Output Types
OTLP
Send logs as OpenTelemetry protobuf over HTTP or gRPC.
output:
type: otlp
endpoint: https://collector:4318
protocol: http # http | grpc
compression: zstd # zstd | none (gzip not yet supported)
auth:
bearer_token: "${OTEL_TOKEN}"
HTTP (JSON Lines)
POST newline-delimited JSON to any HTTP endpoint.
output:
type: http
endpoint: https://logging-service:9200
auth:
headers:
X-API-Key: "${API_KEY}"
Stdout
Print to stdout for debugging/testing.
output:
type: stdout
format: console # console | json | text
- console: Colored, human-readable (timestamp, level, message, key=value pairs)
- json: One JSON object per line (machine-parseable)
- text: Raw text (uses
_rawcolumn if available)
Elasticsearch
Ship logs to Elasticsearch via the Bulk API.
output:
type: elasticsearch
endpoint: https://es-cluster:9200
index: logs # default: "logs"
compression: gzip # gzip | none (optional)
request_mode: buffered # buffered | streaming (optional, default buffered)
- Bulk API: Per-document error handling with automatic retries.
- Batch splitting: Large payloads are split automatically to stay within Elasticsearch limits.
- Compression: Optional gzip compression for reduced network usage.
- Streaming mode: Experimental chunked HTTP request bodies for benchmarking against hosted/serverless clusters.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
endpoint | string | Yes | — | Elasticsearch URL |
index | string | No | logs | Target index name |
compression | string | No | none | gzip or none |
request_mode | string | No | buffered | buffered or experimental streaming (streaming currently requires compression: none) |
Loki
Push logs to Grafana Loki with automatic label grouping.
output:
type: loki
endpoint: http://loki:3100
| Field | Type | Required | Description |
|---|---|---|---|
endpoint | string | Yes | Loki push URL |
Multiple outputs (fan-out)
outputs:
- name: collector
type: otlp
endpoint: https://collector:4318
- name: debug
type: stdout
format: console
SQL Transforms
logfwd uses Apache DataFusion to run SQL queries on your log data. Every log
line becomes a row in a virtual logs table.
Column naming
The JSON scanner creates columns using bare field names with native Arrow types:
level— Utf8View (string)status— Int64 (integer)latency_ms— Float64 (float)
Use bare names directly in SQL:
SELECT * FROM logs WHERE level = 'ERROR'
When a field has mixed types across rows (e.g., status is sometimes an int,
sometimes a string), the builder emits a StructArray conflict column
(status: Struct { int: Int64, str: Utf8View }).
Before SQL execution, conflict columns are normalized to flat Utf8 columns
via COALESCE(CAST(int AS Utf8), CAST(float AS Utf8), str).
Use int(status) or float(status) for numeric operations on these columns.
Custom UDFs
logfwd registers the following custom scalar functions in addition to DataFusion’s built-in functions.
json(column, key) — extract a string value from JSON
json(column: Utf8, key: Utf8) → Utf8
Extracts a field from a raw JSON string column and returns its value as a
string. Integer and float values are coerced to their string representation.
Returns NULL when the key is not present.
SELECT json(_raw, 'host') AS host FROM logs
json_int(column, key) — extract an integer value from JSON
json_int(column: Utf8, key: Utf8) → Int64
Extracts a field from a raw JSON string column and returns its value as a
64-bit integer. Returns NULL when the key is not present or the value is not
a JSON number (e.g. a quoted string "200" returns NULL).
SELECT json_int(_raw, 'status') AS status FROM logs
WHERE json_int(_raw, 'status') >= 500
json_float(column, key) — extract a float value from JSON
json_float(column: Utf8, key: Utf8) → Float64
Extracts a field from a raw JSON string column and returns its value as a
64-bit float. Integer values are promoted to float. Returns NULL when the
key is not present or the value is a quoted string.
SELECT json_float(_raw, 'duration') AS duration_sec FROM logs
WHERE json_float(_raw, 'duration') > 1.5
grok(column, pattern) — grok pattern extraction
grok(column: Utf8, pattern: Utf8) → Struct
Applies a Logstash-style grok pattern to each row and returns a struct with
one Utf8 field per named capture (%{PATTERN:name}). Non-matching rows
produce NULL struct fields.
Built-in patterns: IP, IPV4, IPV6, NUMBER, INT, BASE10NUM,
WORD, NOTSPACE, SPACE, DATA, GREEDYDATA, QUOTEDSTRING, UUID,
MAC, URIPATH, URIPATHPARAM, URI, TIMESTAMP_ISO8601, DATE,
TIME, LOGLEVEL, HOSTNAME, EMAILADDRESS.
SELECT grok(message, '%{WORD:method} %{URIPATH:path} %{NUMBER:status}')
FROM logs
regexp_extract(column, pattern, group) — regex capture group extraction
regexp_extract(column: Utf8, pattern: Utf8, group: Int64) → Utf8
Returns the capture group at the given index (1-based), or the full match
when group is 0. Returns NULL when the pattern does not match or the
group index is out of range.
SELECT regexp_extract(message, 'status=(\d+)', 1) AS status_code FROM logs
geo_lookup(ip) — GeoIP enrichment
geo_lookup(ip: Utf8) → Struct
Looks up an IP address in the configured MaxMind database and returns a struct
with fields: country_code (Utf8), country_name (Utf8), city (Utf8),
region (Utf8), latitude (Float64), longitude (Float64), asn (Int64),
org (Utf8). All fields are nullable — private/unresolvable IPs return
NULL. Requires a geo_database enrichment in the pipeline config.
Use get_field() to extract individual fields:
enrichment:
- type: geo_database
path: /data/GeoLite2-City.mmdb
transform: |
SELECT *, get_field(geo, 'city') AS city, get_field(geo, 'country_code') AS country
FROM (SELECT *, geo_lookup(source_ip) AS geo FROM logs)
Examples
Filter by severity
transform: "SELECT * FROM logs WHERE level IN ('ERROR', 'WARN')"
Drop fields
transform: "SELECT * EXCEPT (request_id, trace_id) FROM logs"
Rename fields
transform: "SELECT duration_ms AS latency_ms, * EXCEPT (duration_ms) FROM logs"
Computed fields
transform: |
SELECT *,
CASE WHEN duration_ms > 200 THEN 'slow' ELSE 'fast' END AS speed
FROM logs
Aggregation
transform: |
SELECT level, COUNT(*) as count, AVG(duration_ms) as avg_latency
FROM logs
GROUP BY level
Deployment Guide
This guide covers running logfwd in production environments.
Docker — standalone container
Build the image
docker build -t logfwd:latest .
The multi-stage Dockerfile at the repository root compiles a statically-linked
release binary and copies it into a minimal debian:bookworm-slim image.
Run with a config file
Create config.yaml:
input:
type: file
path: /var/log/app/*.log
format: json
output:
type: otlp
endpoint: otel-collector:4317
protocol: grpc
server:
diagnostics: 0.0.0.0:9090
Run the container, mounting the log directory and config:
docker run -d \
--name logfwd \
-v /var/log:/var/log:ro \
-v $(pwd)/config.yaml:/etc/logfwd/config.yaml:ro \
-p 9090:9090 \
logfwd:latest \
--config /etc/logfwd/config.yaml
Environment variable substitution
Pass secrets and environment-specific values via environment variables instead of baking them into the config file:
output:
type: otlp
endpoint: ${OTEL_ENDPOINT}
docker run -d \
-e OTEL_ENDPOINT=otel-collector:4317 \
-v /var/log:/var/log:ro \
-v $(pwd)/config.yaml:/etc/logfwd/config.yaml:ro \
logfwd:latest --config /etc/logfwd/config.yaml
Kubernetes — DaemonSet
A DaemonSet is the recommended way to deploy logfwd in a Kubernetes cluster. Each
node runs one logfwd pod that reads container logs from /var/log on the host.
A ready-to-use manifest is provided at deploy/daemonset.yml.
Minimal DaemonSet
apiVersion: v1
kind: Namespace
metadata:
name: collectors
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: logfwd
namespace: collectors
---
apiVersion: v1
kind: ConfigMap
metadata:
name: logfwd-config
namespace: collectors
data:
config.yaml: |
input:
type: file
path: /var/log/pods/**/*.log
format: cri
transform: |
SELECT
level,
message,
_timestamp,
_stream
FROM logs
WHERE level != 'DEBUG'
output:
type: otlp
endpoint: ${OTEL_ENDPOINT}
protocol: grpc
compression: zstd
server:
diagnostics: 0.0.0.0:9090
log_level: info
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: logfwd
namespace: collectors
labels:
app: logfwd
spec:
selector:
matchLabels:
app: logfwd
template:
metadata:
labels:
app: logfwd
spec:
serviceAccountName: logfwd
tolerations:
- operator: Exists # run on all nodes including control-plane
containers:
- name: logfwd
image: logfwd:latest
imagePullPolicy: IfNotPresent
args:
- --config
- /etc/logfwd/config.yaml
env:
- name: OTEL_ENDPOINT
value: otel-collector.monitoring.svc.cluster.local:4317
ports:
- name: diagnostics
containerPort: 9090
resources:
requests:
cpu: "250m"
memory: "128Mi"
limits:
cpu: "1"
memory: "512Mi"
volumeMounts:
- name: varlog
mountPath: /var/log
readOnly: true
- name: config
mountPath: /etc/logfwd
readOnly: true
volumes:
- name: varlog
hostPath:
path: /var/log
- name: config
configMap:
name: logfwd-config
Apply it:
kubectl apply -f deploy/daemonset.yml
kubectl -n collectors rollout status daemonset/logfwd
Kubernetes metadata enrichment
Use the k8s_path enrichment table to attach namespace, pod, and container labels to
every log record:
enrichment:
k8s:
type: k8s_path
transform: |
SELECT
l.level,
l.message,
k.namespace,
k.pod_name,
k.container_name
FROM logs l
LEFT JOIN k8s k ON l._source_path = k.log_path_prefix
Namespace filtering
To collect logs only from specific namespaces, filter in the transform:
transform: |
SELECT l.*, k.namespace, k.pod_name, k.container_name
FROM logs l
LEFT JOIN k8s k ON l._source_path = k.log_path_prefix
WHERE k.namespace IN ('production', 'staging')
Scraping the diagnostics endpoint with Prometheus
Expose port 9090 in the pod spec and add a Prometheus scrape annotation:
metadata:
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
prometheus.io/path: "/api/pipelines"
OTLP collector integration
logfwd sends log records as OTLP protobuf. Any OpenTelemetry-compatible collector can receive them.
OpenTelemetry Collector
Add a otlp receiver to your collector config and enable the logs pipeline:
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
exporters:
debug:
verbosity: normal
otlphttp/loki:
endpoint: http://loki:3100/otlp
service:
pipelines:
logs:
receivers: [otlp]
exporters: [debug, otlphttp/loki]
Point logfwd at the collector:
output:
type: otlp
endpoint: otel-collector:4317
protocol: grpc
compression: zstd
Grafana Alloy / Agent
Grafana Alloy can receive OTLP logs and forward them to Loki or Tempo. Configure an
otelcol.receiver.otlp component and connect it to your exporter pipeline.
Resource sizing guidelines
logfwd is designed to process logs on a single CPU core. The pipeline runs as a set of blocking OS threads — one per input plus shared coordinator threads.
Baseline
| Scenario | CPU | Memory |
|---|---|---|
| Quiet node (< 1 MB/s) | 50 m | 64 Mi |
| Typical node (1–10 MB/s) | 250 m | 128 Mi |
| High-throughput node (> 10 MB/s) | 500 m – 1 CPU | 256 Mi |
Memory breakdown
| Component | Typical size |
|---|---|
| Arrow RecordBatch (per batch, 8 KB read) | ~512 Ki |
| DataFusion query plan | ~4 Mi |
| OTLP request buffer | ~2 Mi |
| Enrichment tables | < 1 Mi |
| Per-pipeline overhead | ~16 Mi |
Tuning tips
- Reduce memory: avoid
keep_raw: true(disabled by default) — it stores the full JSON line and accounts for up to 65 % of table memory. - Reduce CPU: use a
WHEREclause in the transform to drop unwanted records early. - Multiple pipelines: each pipeline occupies its own thread. Add CPU budget proportionally.
Kubernetes resource example
resources:
requests:
cpu: "250m"
memory: "128Mi"
limits:
cpu: "1"
memory: "512Mi"
Set the limit higher than the request so logfwd can burst during log spikes without being OOM-killed.
Validating before deploy
Use --validate to parse and validate the config without starting the pipeline:
logfwd --config config.yaml --validate
Use --dry-run to build all pipeline objects without starting them (catches errors
such as SQL syntax issues):
logfwd --config config.yaml --dry-run
Both commands exit 0 on success and print an error to stderr on failure.
Docker Deployment
Quick start
docker run -v /var/log:/var/log:ro \
-v ./config.yaml:/etc/logfwd/config.yaml:ro \
ghcr.io/strawgate/memagent:latest \
--config /etc/logfwd/config.yaml
Dockerfile
The release workflow builds multi-arch images for linux/amd64 and
linux/arm64 using a distroless base image.
See Dockerfile and .github/workflows/release.yml for details.
Monitoring & Diagnostics
Enable the diagnostics server in your config:
server:
diagnostics: 0.0.0.0:9090
Endpoints
| Endpoint | Description |
|---|---|
GET /health | Liveness probe (uptime, version) |
GET /ready | Readiness probe (200 once initialized) |
GET /api/pipelines | Detailed JSON with per-stage metrics |
GET /api/stats | Flattened JSON for polling/benchmarks |
GET /api/config | View active YAML configuration |
GET /api/logs | View recent log lines from stderr |
GET /api/history | Time-series data for dashboard charts |
GET /api/traces | Detailed latency spans for recent batches |
GET / | HTML dashboard |
Key metrics
| Metric | Description |
|---|---|
logfwd_input_lines_total | Lines read per input |
logfwd_transform_lines_in | Lines entering SQL transform |
logfwd_transform_lines_out | Lines after filtering |
logfwd_stage_seconds_total | Time per stage (scan, transform, output) |
logfwd_flush_reason_total | Flush triggers (size vs timeout) |
OTLP metrics push
server:
metrics_endpoint: http://otel-collector:4318
metrics_interval_secs: 60
Architecture
Data flow
Sources (file / TCP / UDP / OTLP receiver)
│
│ each source produces Bytes independently
▼
Format Parser (CRI / JSON / Raw) ─per source─
│ strips CRI timestamp/stream prefix, accumulates NDJSON
▼
Scanner ─per source─
│ one pass classifies entire buffer via ChunkIndex
│ walks structural positions directly into Arrow columns
│ injects _resource_* columns from source metadata
▼
RecordBatch per source
│
├─→ [if queue.mode: pre-transform] write Arrow IPC segment → ack source
│
├─→ register as partitions of `logs` MemTable
│ (DataFusion concatenates partitions during query)
▼
SQL Transform (DataFusion)
│ user SQL: SELECT, WHERE, GROUP BY + UDFs
│ enrichment tables available via JOIN
│ _resource_* columns flow through like any other column
▼
Post-transform RecordBatch
│
├─→ [if queue.mode: post-transform] write Arrow IPC segment → ack source
│
▼
Output Sinks (OTLP / JSON lines / HTTP / stdout)
│ in-memory fan-out via bounded channels
│ group rows by _resource_* columns for OTLP ResourceLogs
Persistence
Arrow IPC is the universal segment format. Every persistence point writes Arrow IPC File format with atomic seal, and every persistence point can target local disk or object storage (S3/GCS/Azure Blob).
Queue mode is configurable per pipeline:
pipeline:
queue:
mode: pre-transform # or post-transform, or none
storage: s3://my-bucket/logfwd/pipeline-a/
max_bytes: 10GB
max_age: 24h
pre-transform: Source segments are the queue. Each source writes
its own Arrow IPC segments after scanning. Outputs replay from source
segments and re-run the transform. Raw data is re-queryable with
different SQL. One copy of data. Enrichment data may be stale on replay
(_resource_* columns are always correct; enrichment table data
reflects query-time state, not ingest-time).
post-transform: Shared post-transform queue per pipeline.
Transform runs once, result is persisted. All outputs share one queue
with independent cursors. Enrichment baked in at ingest time — correct
even on replay days later. Can’t re-query with different SQL. Second
copy of data (but usually smaller due to filtering).
none (default): Pure in-memory fan-out. Transform runs once,
RecordBatches passed to outputs via bounded channels. Batches drop on
output failure. Lowest resource usage.
End-to-end ack
Sources must NOT acknowledge to their upstream until persistence confirms durability. The ack chain:
Scanner produces RecordBatch
→ FileWriter writes to .tmp
→ fsync + rename (segment sealed)
→ Ack source: "data through offset X is durable"
→ Source advances checkpoint
Ack latency is scanner + IPC write + fsync, typically 5-15ms. Comparable to Kafka’s acks=all (3-15ms).
File sources don’t need the ack latency to be fast — the log file is already durable. The checkpoint just tracks the file offset.
Network sources see the full ack latency as their response time.
When queue.mode: none: No persistence, no ack-after-write. Source
checkpoints advance after in-memory fan-out. Data can be lost on crash.
Output delivery
Real-time path (all modes): Transform runs once per batch cycle, result fans out to all outputs via bounded channels.
Replay (when queue is enabled): If an output falls behind or
restarts, it replays from the queue. With pre-transform mode, replay
re-runs the transform. With post-transform mode, replay reads
transformed segments directly.
Cursor tracking: Each output tracks { segment_id, batch_offset }.
Persisted on each ack. On restart, resume from last position.
Retention: TTL (max_segment_age) + size (max_disk_bytes). When
either limit is hit, oldest segments are evicted regardless of cursor
positions. Outputs pointing at evicted segments see a gap (tracked in
segments_dropped metric).
Delivery semantics: At-least-once when queue is enabled.
Best-effort when queue is none.
Multi-source pipeline
Each source scans independently and writes its own Arrow IPC segments.
At transform time, the pipeline reads the latest segments from each
source and registers them as partitions of the logs MemTable.
DataFusion concatenates partitions during query execution.
This means:
- Each source can have a different
ScanConfig(different wanted_fields from predicate pushdown) - Schema differences between sources are handled by DataFusion’s schema merging (missing columns are null)
- The StreamingBuilder’s
Byteslifetime is per-source - A source with no data in a cycle contributes no partition
Resource metadata as columns
Source identity and resource attributes are carried as _resource_*
prefixed columns (e.g., _resource_k8s_pod_name,
_resource_k8s_namespace, _resource_service_name). These are injected
during scanning based on the source’s configuration.
This design:
- Survives SQL transforms naturally (they’re just columns)
- Persists in the Arrow IPC segments (always correct, unlike enrichment)
- Enables OTLP output to group rows by resource (group-by on
_resource_*columns, oneResourceLogsper distinct combination) - Uses dictionary encoding for efficiency (same pod name on every row from one source costs ~one entry)
- Output sinks exclude
_resource_*columns from the payload (same pattern as_raw)
Column naming conventions
| Column | Purpose | Example |
|---|---|---|
{field} | Bare name, native Arrow type | message (Utf8View), status (Int64) |
{field} (conflict) | StructArray with typed children | status: Struct { int: Int64, str: Utf8View } |
_raw | Raw input line (optional) | _raw |
_timestamp | CRI timestamp (RFC 3339 string) | _timestamp |
_stream | CRI stream name | _stream |
_resource_* | Source/resource metadata | _resource_k8s_pod_name |
Type conflicts produce a single StructArray column with typed children,
not separate flat columns.
Arrow IPC segment format
Format: Arrow IPC File format with atomic seal.
Write path: FileWriter writes to .tmp file. On seal:
finish() (writes footer) → fsync → atomic rename to final path.
Readers never see incomplete files.
Segment lifecycle:
- Pipeline writes batch(es) to
.tmpfile viaFileWriter - At size threshold (64 MB) or time threshold: seal segment
- On startup: delete orphaned
.tmpfiles
Storage abstraction: All persistence writes go through a
SegmentStore trait with implementations for local filesystem and
object storage (S3/GCS/Azure Blob). The same segment format and code
path is used regardless of storage backend.
Deployment model
logfwd scales by running more instances. One binary, one pipeline per instance. S3 is the coordination layer between instances.
Single instance: Source → Scanner → Transform → Output, all in one process. Predicate pushdown works. In-memory fan-out. Simple.
Scaled out: Multiple instances with different configs, sharing S3 paths. Each instance is a full logfwd binary.
Instance A (edge collector):
file source → scanner → write Arrow IPC to s3://bucket/raw/
Instance B (central processor):
s3 source (reads s3://bucket/raw/) → transform → output sinks
# or: → write to s3://bucket/transformed/ for another instance
Instance C (dedicated sender):
s3 source (reads s3://bucket/transformed/) → output sinks
No custom RPC, no cluster coordination. Arrow IPC on S3 is the universal interface. Any instance can read any other’s segments.
Scanner architecture
The scanner is the performance-critical path. It has two stages:
Stage 1 — Chunk classification (chunk_classify.rs): Process the
entire NDJSON buffer in 64-byte blocks. For each block, find all quote
and backslash positions, compute an escape-aware real-quote bitmask, and
build a string-interior mask. Output: ChunkIndex with pre-computed
bitmasks.
Stage 2 — Field extraction (scanner.rs): A scalar state machine
walks top-level JSON objects. For each field, it resolves the key to an
index (HashMap, once per field per batch) and routes the value to the
builder via append_*_by_idx. String scanning uses the pre-computed
ChunkIndex for O(1) closing-quote lookup.
The scan loop is generic over the ScanBuilder trait:
StreamingBuilder(viaScanner::scan_detached): builds detachedStringArraycolumns. For the persistence path (Arrow IPC segments).StreamingBuilder(viaScanner::scan): stores(row, offset, len)views into abytes::Bytesbuffer. BuildsStringViewArraycolumns with zero copies. 20% faster. For real-time hot path when persistence is disabled.
Async pipeline
The pipeline runs on a tokio multi-thread runtime. Key components:
- Sources implement
async fn run(&mut self, ctx: &mut SourceContext)(Arroyo-style source-owns-loop). File sources wrap FileTailer viaspawn_blocking. - Scanner runs on
spawn_blocking(pure CPU, ~4MB per call). - Transform is
async fn execute()(DataFusion is natively async). - Sinks implement
async fn send_batch(). HTTP-based sinks use asyncreqwestfor connection pooling and timeouts. - Shutdown via
CancellationToken(already implemented).
Crate map
| Crate | Purpose |
|---|---|
logfwd | Binary. CLI, pipeline orchestration, OTel metrics. |
logfwd-core | Scanner, builders, parsers, diagnostics, enrichment, OTLP encoder. |
logfwd-config | YAML config deserialization and validation. |
logfwd-transform | DataFusion SQL. UDFs: grok(), regexp_extract(), int(), float(). |
logfwd-output | Output sinks + async Sink trait. OTLP, JSON lines, HTTP, stdout. |
logfwd-bench | Criterion benchmarks. |
What’s implemented vs not yet
Implemented: file input, CRI/JSON/Raw parsing, zero-copy scanner, two builder backends (StreamingBuilder default), DataFusion SQL transforms (async), custom UDFs (grok, regexp_extract, int, float), enrichment (K8s path, host info, static labels), OTLP output, JSON lines output, stdout output, diagnostics server, OTel metrics, signal handling (SIGINT/SIGTERM via CancellationToken), graceful shutdown, async Sink trait.
Not yet: async pipeline runtime, async Source trait, Arrow IPC
persistence (pre/post-transform), SegmentStore abstraction (local +
S3/GCS), object storage upload, _resource_* column injection, OTLP
resource grouping, output cursor tracking, TCP/UDP/OTLP input,
Elasticsearch/Loki/Parquet output, file offset checkpointing, SQL
rewriter, S3 source (for scaled-out deployment).
Zero-Copy Scanner
The scanner converts newline-delimited JSON into Apache Arrow RecordBatches using SIMD-accelerated structural classification.
How it works
-
Stage 1 (SIMD): Classify the entire buffer in one pass. Find all quotes and backslashes using platform-specific SIMD: AVX2/SSE2 on x86_64, NEON on aarch64. This produces 64-bit bitmasks for O(1) string boundary lookups.
-
Stage 2 (Scalar): Walk the JSON structure using the pre-computed bitmasks. Extract field names and values, detect types (int/float/string), and build Arrow columns directly.
Zero-copy mode
Scanner uses Arrow’s StringViewArray to create 16-byte views
into the input buffer. String data is never copied — the original input buffer
is shared via reference counting.
Field pushdown
The SQL transform is analyzed before scanning. If the query only references
level and message, the scanner skips extracting all other fields. On
wide data (20+ fields), this provides 2-3x throughput improvement.
Performance
| Dataset | Fields | Throughput |
|---|---|---|
| Narrow (3 fields) | 3 | 3.4M lines/sec |
| Simple (6 fields) | 6 | 2.0M lines/sec |
| Wide (20 fields) | 20 | 560K lines/sec |
| Wide (2 fields projected) | 20→2 | 1.4M lines/sec |
Performance
Throughput targets
logfwd targets 1M+ lines/sec on a single CPU core with CRI parsing and OTLP encoding.
Benchmark results
| Stage | Time (100K lines) | % of total |
|---|---|---|
| Scan (JSON→Arrow) | 21ms | 57% |
| Transform (SQL) | ~0ms | ~0% |
| OTLP encode | 9ms | 27% |
| zstd compress | 6ms | 16% |
| Total CPU | 36ms | 2.8M lines/sec |
Key optimizations
- SIMD structural indexing: Classifies JSON structure in one vectorized pass
- Zero-copy StringViewArray: No string copies in the hot path
- Field pushdown: Scanner skips unused fields based on SQL analysis
- Persistent zstd context: Compression context reused across batches
- Connection pooling: HTTP agent reused for output requests
- Block-in-place output: Overlaps I/O with scanning via tokio
Memory profile
At our default 4MB batch size (~23K lines):
- Arrow RecordBatch overhead: ~2MB
- Input buffer: 4MB (shared with StringView columns)
- Total per batch: ~6MB
For 1M lines (stress test only):
- Real RSS: ~205MB (not 926MB as
get_array_memory_size()reports) - StringViewArray shares the input buffer across all string columns
Troubleshooting
This guide helps you diagnose and fix common problems with logfwd.
Common error messages
config validation error: pipeline '...' has no inputs
The named pipeline has an empty inputs list (or no input key in simple layout).
# Wrong — no inputs
pipelines:
app:
outputs:
- type: stdout
# Fixed
pipelines:
app:
inputs:
- type: file
path: /var/log/app/*.log
outputs:
- type: stdout
config validation error: file input requires 'path'
A file input is missing the required path glob.
# Wrong
input:
type: file
# Fixed
input:
type: file
path: /var/log/app/*.log
config validation error: otlp output requires 'endpoint'
An otlp, http, elasticsearch, or loki output is missing the required
endpoint field.
# Wrong
output:
type: otlp
# Fixed
output:
type: otlp
endpoint: otel-collector:4317
config validation error: cannot mix top-level input/output with pipelines
You used both the simple layout (top-level input/output) and the advanced layout
(pipelines map) in the same file. Choose one.
config YAML error: ...
The YAML is malformed. Common causes:
- Indentation errors (YAML requires consistent spaces, not tabs).
- Missing quotes around values that contain special characters like
:. - Multi-line SQL not using a block scalar (
|):
# Wrong — colon in SQL breaks YAML
transform: SELECT level FROM logs WHERE level = 'ERROR'
# Fixed — use block scalar
transform: |
SELECT level FROM logs WHERE level = 'ERROR'
error sending to OTLP endpoint: connection refused
logfwd cannot reach the configured OTLP collector.
-
Verify the endpoint address and port are correct.
-
Check that the collector is running:
curl -v http://otel-collector:4318/v1/logs -
If using gRPC (
protocol: grpc), ensure port 4317 is open; for HTTP use 4318. -
In Kubernetes, verify the service name resolves from within the pod:
kubectl exec -n collectors <pod> -- nslookup otel-collector
error watching path: No such file or directory
The glob pattern in a file input matched no files and the base directory does not
exist. Ensure the directory is mounted and the pattern is correct.
error watching path: permission denied
logfwd cannot read the log directory. In Kubernetes, confirm the varlog volume
is mounted with readOnly: true at /var/log and that the container user has read
access.
Diagnosing dropped or missing data
Step 1 — Check transform filter_drop_rate
Call the /api/pipelines endpoint (see Reading /api/pipelines
below). Look at the transform.filter_drop_rate field. A value close to 1.0 means
almost all records are being dropped by your WHERE clause.
Example: you intended to keep errors but accidentally wrote a filter that matches nothing:
-- Typo — 'error' vs 'ERROR'
WHERE level = 'error'
Fix: adjust the WHERE clause or remove it temporarily to confirm records are flowing.
Step 2 — Check input and output line counts
In /api/pipelines, compare:
inputs[*].lines_total— lines read from sourcetransform.lines_in— lines entering the transform stagetransform.lines_out— lines leaving the transform stageoutputs[*].lines_total— lines successfully sent
If inputs[*].lines_total is zero, logfwd is not reading any files. Check the
glob pattern and confirm new lines are being appended to the files.
If outputs[*].errors is non-zero, there are delivery failures. Check the logs
for error sending messages.
Step 3 — Verify file tailing
Use --dry-run to confirm the file input starts without error:
logfwd --config config.yaml --dry-run
Enable debug logging to see file discovery and tail events:
server:
log_level: debug
Look for log lines like:
[DEBUG] tailing /var/log/pods/app_pod-xyz/app/0.log
[DEBUG] read 4096 bytes from /var/log/pods/app_pod-xyz/app/0.log
If files appear in the log but no records reach the output, the format processing
(FramedInput) may be discarding lines. Switch to format: raw temporarily to
confirm raw lines are flowing:
input:
type: file
path: /var/log/app/*.log
format: raw
Step 4 — Check for output errors
If outputs[*].errors is increasing, look at stderr for the specific error. Common
causes:
| Symptom | Cause | Fix |
|---|---|---|
connection refused | Collector is down or unreachable | Check network connectivity and collector health |
413 Request Entity Too Large | Batch too large | Reduce batch size (future config option) |
401 Unauthorized | Missing auth token | Add Authorization header support (not yet implemented) |
Slow output_s in stage_seconds | Network latency to collector | Use compression (compression: zstd) or move collector closer |
Reading /api/pipelines
Enable the diagnostics server:
server:
diagnostics: 0.0.0.0:9090
Then query it:
curl -s http://localhost:9090/api/pipelines | jq .
Response schema
{
"pipelines": [
{
"name": "default",
"inputs": [
{
"name": "pod_logs",
"type": "file",
"lines_total": 1024000,
"bytes_total": 204800000,
"errors": 0
}
],
"transform": {
"sql": "SELECT * FROM logs WHERE level = 'ERROR'",
"lines_in": 1024000,
"lines_out": 2048,
"errors": 0,
"filter_drop_rate": 0.998
},
"outputs": [
{
"name": "collector",
"type": "otlp",
"lines_total": 2048,
"bytes_total": 512000,
"errors": 0
}
],
"batches": {
"total": 512,
"avg_rows": 4.0,
"flush_by_size": 500,
"flush_by_timeout": 12,
"dropped_batches_total": 0,
"scan_errors_total": 0,
"last_batch_time_ns": 1712160000000000000
},
"stage_seconds": {
"scan": 1.234567,
"transform": 0.012345,
"output": 0.456789
}
}
],
"system": {
"uptime_seconds": 3600,
"version": "0.1.0"
}
}
Key fields
| Field | Description |
|---|---|
inputs[*].lines_total | Total log lines read from this input since start. |
inputs[*].bytes_total | Total bytes read from this input since start. |
inputs[*].errors | Total read errors (file open failures, etc.). |
transform.lines_in | Lines entering the SQL transform. |
transform.lines_out | Lines produced by the SQL transform after filtering. |
transform.filter_drop_rate | Fraction of input lines dropped: 1 - (lines_out / lines_in). |
outputs[*].lines_total | Lines successfully delivered to this output. |
outputs[*].errors | Delivery errors (network failures, HTTP errors, etc.). |
batches.total | Total Arrow batches processed. |
batches.avg_rows | Average rows per batch. |
batches.flush_by_size | Batches flushed because they reached the row limit. |
batches.flush_by_timeout | Batches flushed because the timeout expired. |
batches.dropped_batches_total | Batches dropped due to backpressure or errors. |
batches.scan_errors_total | Scanner errors (malformed input, etc.). |
batches.last_batch_time_ns | Unix timestamp (ns) of last processed batch. |
stage_seconds.scan | Total CPU time spent in the scanner. |
stage_seconds.transform | Total CPU time spent in DataFusion SQL. |
stage_seconds.output | Total CPU time spent in the output sink (includes network). |
system.uptime_seconds | Seconds since logfwd started. |
Debug mode / increasing log verbosity
Set log_level in the server block:
server:
log_level: debug
Available levels (least to most verbose): error, warn, info, debug, trace.
logfwd writes all log output to stderr. Redirect it to a file if needed:
logfwd --config config.yaml 2>logfwd.log
In Kubernetes:
kubectl -n collectors logs daemonset/logfwd
What each level shows
| Level | What you see |
|---|---|
error | Fatal errors only. |
warn | Recoverable problems (e.g. failed delivery, retrying). |
info | Pipeline start/stop, file discovery, batch flush summaries. |
debug | Per-file tail events, batch sizes, SQL plan details. |
trace | Per-record scanner output, individual JSON field extraction. |
Warning:
tracelevel generates very large output on busy nodes. Use only for short debugging sessions.
Validating configuration
# Check syntax and field types only (fast)
logfwd --config config.yaml --validate
# Build full pipeline objects including SQL parsing (slower, catches SQL errors)
logfwd --config config.yaml --dry-run
Both commands print a success message or a detailed error to stderr and exit without starting any pipelines.
Checking the SQL transform
Use --dry-run to catch SQL syntax errors:
logfwd --config config.yaml --dry-run
# error: SQL error: Execution error: column "leve" not found
Test your SQL against a sample file using the stdout output:
input:
type: file
path: /path/to/sample.log
format: json
transform: |
SELECT level, message FROM logs WHERE status >= 500
output:
type: stdout
format: console
Run once and inspect the output:
logfwd --config test.yaml
Performance issues
High CPU usage
- Check
stage_secondsin/api/pipelines. Iftransformdominates, simplify the SQL query or add indexes via WHERE-clause pushdown. - If
scandominates, check the input volume withinputs[*].bytes_total / system.uptime_seconds. logfwd processes ~1.7 GB/s on a single core; sustained CPU near 100 % on a fast input is expected.
High memory usage
- Confirm
keep_raw: false(the default). Settingkeep_raw: truestores the full JSON line and can double or triple memory consumption. - Check the number of unique field names across your log lines. Each unique field produces at least one Arrow column; very wide schemas use more memory per batch.
Records accumulating / not being shipped
Check outputs[*].errors — if non-zero, delivery is failing and records are being
discarded. Enable log_level: warn to see delivery error messages.
Check batches.flush_by_timeout vs batches.flush_by_size. If nearly all flushes
are timeout-driven, the input rate is low and latency is bounded by the flush timeout
(expected behaviour).
Developing logfwd
Workspace layout
crates/
logfwd/ Binary crate. CLI, async pipeline orchestration.
logfwd-core/ Proven kernel. Scanner, parsers, pipeline state machine, OTLP encoding. no_std.
logfwd-arrow/ Arrow integration. ScanBuilder impls, SIMD backends, RecordBatch builders.
logfwd-config/ YAML config parsing and validation.
logfwd-io/ I/O layer. File tailing, TCP/UDP/OTLP inputs, checkpointing, diagnostics.
logfwd-transform/ DataFusion SQL transforms, UDFs (grok, regexp_extract, geo_lookup).
logfwd-output/ Output sinks (OTLP, Elasticsearch, Loki, JSON lines, stdout).
logfwd-bench/ Criterion benchmarks for the scanner pipeline.
logfwd-competitive-bench/ Comparative benchmarks vs other log agents.
logfwd-test-utils/ Shared test utilities.
logfwd-ebpf-proto/ eBPF log capture protocol definitions (experimental).
Build, test, lint, bench, fuzz
just test # All tests
just lint # fmt + clippy + toml + deny + typos
cargo test -p logfwd-core # Core crate only (fastest iteration)
RUSTFLAGS="-C target-cpu=native" cargo bench --bench scanner -p logfwd-core
cd crates/logfwd-core && cargo +nightly fuzz run scanner -- -max_total_time=300
Compile caching with sccache
sccache caches Rust compilation artefacts to speed up builds. The project is configured to use it (.cargo/config.toml sets rustc-wrapper = "sccache").
CI and Copilot agents: sccache is installed automatically — no action needed.
Local development: install sccache once:
cargo install sccache --locked
After that, every cargo build / cargo test / just clippy will use the cache automatically via the project’s .cargo/config.toml.
To temporarily disable sccache (e.g. for debugging):
RUSTC_WRAPPER="" cargo build
Local CPU profiling (macOS)
The cpu-profiling feature works locally on macOS, but the shutdown path matters:
the profiled logfwd process must receive SIGTERM directly so it can build and
write flamegraph.svg before exiting.
The easiest way to run the full File -> OTLP path locally is:
just profile-otlp-local
This recipe:
- builds
logfwdwith--features cpu-profiling - generates a JSON input file
- starts a local OTLP blackhole on a fresh port
- runs
logfwdwith a file input and OTLP output - sends
SIGTERMto the reallogfwdchild process after a short run - leaves a temp directory containing
config.yaml,logs.json,pipeline.log,blackhole.log, andflamegraph.svg
Useful variants:
just profile-otlp-local 1000000 10
Caveats:
- Avoid reusing a diagnostics port from another local run; the helper recipe omits diagnostics entirely to keep the profile loop simple.
- If the
cpu-profilingrelease build fails withNo space left on device, runRUSTC_WRAPPER= cargo cleanand retry. The profiled release build is large becausereleasekeeps debug info for flamegraphs. - Killing a wrapper shell is not sufficient; the
SIGTERMmust reach the actuallogfwdprocess.
Things that will bite you
Hard-won lessons from building the scanner and builder pipeline.
See also dev-docs/ARCHITECTURE.md for pipeline data flow.
The deferred builder pattern exists because incremental null-padding is broken
StreamingBuilder collects (row, value) records during scanning and bulk-builds Arrow columns at finish_batch. This seems roundabout — why not write directly to Arrow builders?
Because maintaining column alignment across multiple type builders (str, int, float) per field is a coordination nightmare. When you write an int, you must pad the str and float builders with null. When end_row fires, pad all unwritten fields. When a new field appears mid-batch, back-fill all prior rows. We tried this (IndexedBatchBuilder); proptest found column length mismatches on multi-line NDJSON with varying field sets.
The deferred pattern is correct by construction: each column is built independently. Gaps are nulls. Columns can never mismatch.
Chunk-level SIMD classification beats per-line SIMD
We tried three approaches:
- Per-line SIMD: load 16 bytes, compare for
"and\. Slower than scalar on short strings. - sonic-rs DOM: SIMD JSON parser builds a DOM per line. The DOM allocation is the bottleneck.
- Chunk-level classification (
StructuralIndex): one portable SIMD pass (viawidecrate) over the entire buffer, detecting 10 structural characters simultaneously. Thenscan_stringis a singletrailing_zerosbit-scan.
Approach 3 wins everywhere because classification is amortized across all strings and per-string lookup is O(1).
The prefix_xor escape detection has a subtle correctness requirement
The simdjson prefix_xor algorithm detects escaped quotes by computing backslash-run parity. It works for consecutive backslashes (\\\" = escaped quote). But for non-consecutive backslashes like \n\", prefix_xor gives wrong results because it counts ALL backslashes, not per-run.
Our implementation iterates each backslash: mark next byte as escaped, skip escaped backslashes. Fast because most JSON has zero or few backslashes. The carry between 64-byte blocks must be handled.
The scanner assumes UTF-8 input
from_utf8_unchecked throughout the scanner and builders. JSON is UTF-8 by spec, so this holds in practice. But the scanner does NOT validate — non-UTF-8 input is UB. The fuzz target guards against this; production code currently doesn’t. See issue #76.
HashMap field lookup was 60% of total scan time
Profiling showed get_or_create_field dominating — SipHash + probe per field per row. Fix: resolve_field does the HashMap lookup once per batch. Subsequent rows use the returned index directly. The ScanBuilder trait’s resolve_field + append_*_by_idx pattern encodes this.
StringViewArray memory reporting is misleading
Arrow’s get_array_memory_size() counts the backing buffer for every column sharing it. If 5 string columns point into the same buffer, reported memory is 5x actual. The StreamingBuilder produces shared-buffer columns; memory reports overcount significantly.
Arrow IPC compression is just a flag
Compressed Arrow IPC is StreamWriter with IpcWriteOptions::try_with_compression(Some(CompressionType::ZSTD)). Any RecordBatch can be compressed. No special builder needed.
keep_raw costs 65% of table memory
The _raw column stores the full JSON line. Larger than all other columns combined. Default is keep_raw: false.
Always use just clippy, never bare cargo clippy
CI runs cargo clippy -- -D warnings (all warnings are errors). Bare cargo clippy only shows warnings, so code that looks clean locally fails in CI. The just clippy recipe matches CI exactly. Additionally, conditional SIMD compilation means warnings differ between aarch64 (macOS) and x86_64 (CI Linux).
proptest finds bugs unit tests can’t
Every time we thought the scanner was correct, proptest broke it. Escapes crossing 64-byte boundaries, fields in different orders, duplicate keys with different types. Run PROPTEST_CASES=2000 minimum.
Oracle tests compare against sonic-rs as ground truth. Our scanner does first-writer-wins for duplicate keys; sonic-rs does last-writer-wins. Both valid per RFC 8259; oracle tests skip duplicate-key inputs.
Two scan modes serve different purposes
Scanner::scan_detached: produces self-containedStringArraycolumns. Input buffer can be freed. For persistence and compression.Scanner::scan: zero-copyStringViewArrayviews intobytes::Bytesbuffer. 20% faster. Buffer must stay alive. For real-time query-then-discard.
Both use the same StreamingBuilder (which implements ScanBuilder), sharing the generic scan_streaming() loop.
Building & Testing
Prerequisites
# Install Rust stable
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Install task runner
cargo install just
# Install development tools
just install-tools
Common commands
just ci # Run full CI suite (lint + test)
just fmt # Format code
just clippy # Run lints
just test # Run all tests
just bench # Run Criterion microbenchmarks
just build # Build release binary
Project structure
crates/logfwd/ # Binary entry point, CLI, pipeline orchestrator
crates/logfwd-core/ # Scanner, file tailer, CRI parser, diagnostics
crates/logfwd-config/ # YAML config parser
crates/logfwd-output/ # Output sinks (OTLP, HTTP, stdout)
crates/logfwd-transform/ # SQL transforms via DataFusion, UDFs
crates/logfwd-bench/ # Benchmarks (Criterion + exploratory)
Benchmarking
Criterion microbenchmarks
just bench
Measures scanner throughput, OTLP encoding speed, and compression performance.
Competitive benchmarks
Compare logfwd against vector, fluent-bit, filebeat, otelcol, and vlagent:
# Binary mode (local dev)
just bench-competitive --lines 1000000 --scenarios passthrough,json_parse,filter
# Docker mode (resource-limited)
just bench-competitive --lines 5000000 --docker --cpus 1 --memory 1g --markdown
Exploratory profiling
# Stage-by-stage profile
cargo run -p logfwd-bench --release --bin e2e-profile
# Memory analysis
cargo run -p logfwd-bench --release --bin sizes
# Real RSS measurement
cargo run -p logfwd-bench --release --bin rss
Nightly benchmarks
Results are published to GitHub Pages automatically via the nightly benchmark workflow. View at: strawgate.github.io/memagent/bench/