Skip to content

Gravity - Materialized Views

Gravity turns NATS JetStream into a queryable StreamDB. Build materialized views from event streams with filters and reducers.

NATS streams are write-optimized append logs, not query engines. Gravity adds the missing query layer.

NATS streams only index by sequence number and timestamp - no content indexes. Filtering by content requires downloading ALL messages:

// ❌ Slow: Must scan 10K messages to find filtered results
sub, _ := js.Subscribe("events.>", ...)
for i := 0; i < 10000; i++ {
msg, _ := sub.NextMsg(time.Second)
// Parse, filter, aggregate...
}

A common misconception is that NATS delivery policies like DeliverLastPerSubject provide fast query access. They don’t - they’re for consumers, not queries:

// ❌ Still slow - sequential consumption, not random access
func LastPerSubject(filters []string) (map[string][][]byte, error) {
consumer, _ := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy,
})
allMessages := []jetstream.Msg{}
for {
batch, _ := consumer.Fetch(20000, ...)
for msg := range batch.Messages() {
allMessages = append(allMessages, msg) // All in memory!
}
}
return result // 200-600ms, can't filter, no Top N
}

Problems:

  • Sequential only - must consume one at a time
  • No content filtering without parsing every message
  • No aggregations (Top N, Sum, Count)
  • Loads all into memory

Note: For single-subject queries, use stream.GetLastMsgForSubject("user.123") - that’s fast!

// ✅ Fast: Pre-materialized view with ~1µs reads
view, _ := grav.From("events").Subject("user.>").Top("score", 10).Build(ctx)
err := gravity.Scan(ctx, view, func(u User) error { ... }) // Instant!
import (
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/moonwalker/orbital/internal/infra/gravity"
)
nc, _ := nats.Connect("nats://localhost:4222")
js, _ := jetstream.New(nc)
grav, _ := gravity.NewManager(js)
defer grav.Close()
// Build a view
view, _ := grav.From("events").
Subject("user.>").
As("active_users").
Top("score", 10).
Build(ctx)
// Iterate with automatic unmarshaling
err := gravity.Scan(ctx, view, func(user User) error {
fmt.Println(user.Name)
return nil
})
func NewManager(js jetstream.JetStream, opts ...Option) (*Manager, error)
func (m *Manager) From(stream string) *Builder
func (m *Manager) DeleteView(ctx context.Context, viewName string) error
func (m *Manager) Close() error
// With namespace for multi-brand deployments
grav, _ := gravity.NewManager(js, gravity.WithNamespace("dreamz"))
func (v *View) Get(ctx context.Context, key string, dest any) error
func (v *View) Iterate(ctx context.Context) (<-chan *Entry, <-chan error)
func (v *View) Name() string
// Works with both View (cached) and Builder (streaming)
func Scan[T any](ctx context.Context, s Scanner, fn func(T) error) error
type Entry struct {
Key string
Value []byte
}
grav.From("stream") // Start builder
.Subject("pattern.>") // Filter subjects
.Subjects("a.>", "b.>") // Multiple patterns
.As("view_name") // Explicit name
.Suffix("readonly") // Add suffix to auto-name
.Where(filterFn) // Filter events
.Last() // Keep last per subject (default)
.First() // Keep first per subject
.Count() // Count events
.Top("field", 10) // Top N by field
.Bottom("field", 10) // Bottom N by field
.Sum("field") // Sum numeric field
.Avg("field") // Average numeric field
.GroupBy("field") // Group by field
.Merge() // Merge all fields
.Build(ctx) // → *View (cached)
.Open(ctx) // → *Cursor (streaming)

The unified gravity.Scan function works with both *View and *Cursor:

// Cached view - Build() creates materialized view (~1µs per iteration)
view, _ := grav.From("events").Subject("user.>").Build(ctx)
err := gravity.Scan(ctx, view, func(user User) error {
fmt.Printf("Processing: %s\n", user.Name)
return nil
})
// Streaming cursor - Open() streams with O(1) memory
cursor, _ := grav.From("events").Subject("user.>").Open(ctx)
err := gravity.Scan(ctx, cursor, func(user User) error {
return nil
})
MethodReturnsMemorySpeedUse Case
Build(ctx)*ViewO(n)~70µs/100 itemsHot paths, repeated queries
Open(ctx)*CursorO(1)~1.3ms/100 itemsAd-hoc queries, unknown size

Decision guide:

  • < 10 queries → use Open() (cursor)
  • > 30 queries → use Build() (view)
  • Hot path / continuous → always Build()
  • One-off / ad-hoc → always Open()

Views are live, not snapshots. After Build() returns, the consumer continues running in the background:

// Build once at startup
leaderboard, _ := grav.From("game").Subject("scores.>").Top("score", 100).Build(ctx)
// Consumer keeps running, updating cache as new scores arrive
for {
gravity.Scan(ctx, leaderboard, func(s Score) error {
// Always the latest top 100 - updated in real-time
})
time.Sleep(time.Second)
}

Safe for Build() (bounded data):

// ✅ Top N - bounded to 100 entries max
grav.From("game").Subject("scores.>").Top("score", 100).Build(ctx)
// ✅ Config/settings - small, rarely changes
grav.From("config").Subject("settings.>").Build(ctx)
// ✅ With TTL - entries expire naturally
grav.From("sessions").Subject("active.>").TTL(time.Hour).Build(ctx)

Use Open() (unbounded/unknown):

// ✅ History queries - unknown size
cursor, _ := grav.From("events").Subject("user.123.history.>").Open(ctx)
// ✅ Reports/exports - potentially large
cursor, _ := grav.From("orders").Subject("orders.2024.>").Open(ctx)

Rule: Build() is safe when you control the upper bound via reducers (Top, filters) or domain constraints. If you can’t guarantee bounds → Open().

view, _ := grav.From("game").
Subject("game.scores.>").
As("leaderboard").
Top("score", 10).
Build(ctx)
err := gravity.Scan(ctx, view, func(score Score) error {
fmt.Printf("%s: %.0f\n", score.Player, score.Value)
return nil
})
view, _ := grav.From("users").
Subject("users.profiles.>").
Where(func(e *gravity.Event) bool {
status, _ := e.String("status")
return status == "active"
}).
Build(ctx)
// Sum
view, _ := grav.From("orders").Subject("orders.>").Sum("amount").Build(ctx)
// Count
view, _ := grav.From("events").Subject("events.>").Count().Build(ctx)
// Average
view, _ := grav.From("ratings").Subject("ratings.>").Avg("score").Build(ctx)

Use helper methods in filter functions:

Where(func(e *gravity.Event) bool {
str, _ := e.String("field")
num, _ := e.Float("score")
n, _ := e.Int("count")
b, _ := e.Bool("active")
return num > 100 && b
})

Views are named with format: v_[namespace_]<hash>[_suffix]

// Auto-generated
view, _ := grav.From("events").Subject("user.>").Build(ctx)
// → v_abc123def456
// With namespace
grav, _ := gravity.NewManager(js, gravity.WithNamespace("dreamz"))
// → v_dreamz_abc123def456
// With suffix
view, _ := grav.From("events").Subject("user.>").Suffix("readonly").Build(ctx)
// → v_abc123def456_readonly
// Explicit name
view, _ := grav.From("events").Subject("user.>").As("my_view").Build(ctx)
// → my_view

Benchmarks on Apple M2 (run with go test -bench=. ./internal/infra/gravity/...):

Operation100 items1,000 items10,000 items
View.Scan70µs700µs7ms
Cursor.Scan1.3ms4ms30ms
View.Build13ms73ms696ms

Key insights:

  • View.Scan is 4-20x faster than Cursor.Scan
  • Build() is expensive - one-time cost, only worth it for repeated queries
  • Break-even: Build() cost ≈ 10-30 Cursor scans
ScenarioNATS FetchGravity (cached)
Filter 10K messages10-50 sec~7ms
Top 10 from 10K messages10-50 sec~70µs
Repeated queries10-50 sec each~70µs
  1. Synchronous - Build() blocks until view is ready
  2. Cached views - Views are always cached (that’s their purpose)
  3. Streaming queries - Builder queries stream by default (O(1) memory)
  4. Auto-sync - Views update automatically as events arrive
  5. Thread-safe - Manager and View are safe for concurrent use

Use Gravity when:

  • You need to query/filter by message content
  • You need aggregations (Top N, Count, Sum, Avg)
  • You’ll run the same query multiple times
  • You need fast reads (<10ms)

Use NATS directly when:

  • You know the exact sequence number
  • You’re consuming messages sequentially
  • Single-subject last message - use stream.GetLastMsgForSubject()