Skip to main content

Messaging & Storage

The Defense Center is built around a Kafka pipeline that feeds OpenSearch, plus a set of stores the backend uses directly. This page describes each layer and the topics/indices that tie them together.


The Kafka pipeline

Kafka (broker)

Single-node Kafka in KRaft mode (no ZooKeeper). Internal listener broker:29092. Holds the pipeline topics:

TopicProduced byConsumed by
sensor_eventssensor-apievent-stream-aggr
snort_alertsevent-stream-aggrlogstash
ravenxcope_threat_geo_eventslogstashbackend threat-map consumer

Schema Registry

schema-registry (:8081) stores the Avro schemas used on sensor_events. The data collector registers/uses these schemas when producing.

Event Stream Aggregator

event-stream-aggr is configured via env: INPUT_KAFKA_TOPIC=sensor_events, OUTPUT_KAFKA_TOPIC=snort_alerts, KAFKA_BROKERS=broker:29092, SCHEMA_REGISTRY_URL. It consumes raw events, performs geo/IP enrichment, and republishes aggregated alerts. See Event Aggregator.

Logstash

opensearch-logstash runs conf/pipeline.conf:

  • input — Kafka topic snort_alerts.
  • filter — extracts tenant_id from the Kafka message header and stamps received_opensearch_at.
  • output — indexes into OpenSearch index mataelang-sensor-events-%{tenant_id} and produces geo events to ravenxcope_threat_geo_events for the live threat map.

Storage layer

OpenSearch

opensearch-node1 (:9200) stores enriched alert documents in per-tenant indices mataelang-sensor-events-<tenant_id>. The backend reads analytics from OpenSearch (OpenSearch__Url, OpenSearch__IndexName). opensearch-init bootstraps templates/indices once on startup.

PostgreSQL

postgres (:17) is the relational source of truth: tenants, organizations, sensors, virtual sensors, users, roles, claim codes, and NATS account bookkeeping. The backend auto-migrates on startup (Database__AutoMigrate=true).

Redis

redis (redis-stack) is used by the backend for caching and ephemeral state.

InfluxDB

influxdb (:8086) stores time-series data the sensor heartbeat produces: online status (sensor_status) and host metrics (sensor_metrics — CPU, memory, interface traffic). See Sensor → Heartbeat & Configuration.


What each store is for (quick reference)

StoreBackend config prefixHolds
OpenSearchOpenSearch__*Enriched alerts / analytics
PostgreSQLPostgresqlSettings__*Relational entities
RedisRedisSettings__*Cache / ephemeral state
InfluxDBInfluxDb__*Host metrics + heartbeat status
KafkaThreatMap__Kafka__* (consumer side)Event pipeline