Gravity - Materialized Views
Gravity turns NATS JetStream into a queryable StreamDB. Build materialized views from event streams with filters and reducers.
Why Gravity?
Section titled “Why Gravity?”NATS streams are write-optimized append logs, not query engines. Gravity adds the missing query layer.
The Problem with Direct NATS Fetch
Section titled “The Problem with Direct NATS Fetch”NATS streams only index by sequence number and timestamp - no content indexes. Filtering by content requires downloading ALL messages:
// ❌ Slow: Must scan 10K messages to find filtered resultssub, _ := js.Subscribe("events.>", ...)for i := 0; i < 10000; i++ { msg, _ := sub.NextMsg(time.Second) // Parse, filter, aggregate...}Why DeliverLastPerSubject Doesn’t Help
Section titled “Why DeliverLastPerSubject Doesn’t Help”A common misconception is that NATS delivery policies like DeliverLastPerSubject provide fast query access. They don’t - they’re for consumers, not queries:
// ❌ Still slow - sequential consumption, not random accessfunc LastPerSubject(filters []string) (map[string][][]byte, error) { consumer, _ := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy, })
allMessages := []jetstream.Msg{} for { batch, _ := consumer.Fetch(20000, ...) for msg := range batch.Messages() { allMessages = append(allMessages, msg) // All in memory! } } return result // 200-600ms, can't filter, no Top N}Problems:
- Sequential only - must consume one at a time
- No content filtering without parsing every message
- No aggregations (Top N, Sum, Count)
- Loads all into memory
Note: For single-subject queries, use stream.GetLastMsgForSubject("user.123") - that’s fast!
How Gravity Solves This
Section titled “How Gravity Solves This”// ✅ Fast: Pre-materialized view with ~1µs readsview, _ := grav.From("events").Subject("user.>").Top("score", 10).Build(ctx)err := gravity.Scan(ctx, view, func(u User) error { ... }) // Instant!Quick Start
Section titled “Quick Start”import ( "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/moonwalker/orbital/internal/infra/gravity")
nc, _ := nats.Connect("nats://localhost:4222")js, _ := jetstream.New(nc)
grav, _ := gravity.NewManager(js)defer grav.Close()
// Build a viewview, _ := grav.From("events"). Subject("user.>"). As("active_users"). Top("score", 10). Build(ctx)
// Iterate with automatic unmarshalingerr := gravity.Scan(ctx, view, func(user User) error { fmt.Println(user.Name) return nil})Core API
Section titled “Core API”Manager
Section titled “Manager”func NewManager(js jetstream.JetStream, opts ...Option) (*Manager, error)func (m *Manager) From(stream string) *Builderfunc (m *Manager) DeleteView(ctx context.Context, viewName string) errorfunc (m *Manager) Close() error
// With namespace for multi-brand deploymentsgrav, _ := gravity.NewManager(js, gravity.WithNamespace("dreamz"))func (v *View) Get(ctx context.Context, key string, dest any) errorfunc (v *View) Iterate(ctx context.Context) (<-chan *Entry, <-chan error)func (v *View) Name() stringScan Function
Section titled “Scan Function”// Works with both View (cached) and Builder (streaming)func Scan[T any](ctx context.Context, s Scanner, fn func(T) error) errortype Entry struct { Key string Value []byte}Builder API
Section titled “Builder API”grav.From("stream") // Start builder .Subject("pattern.>") // Filter subjects .Subjects("a.>", "b.>") // Multiple patterns .As("view_name") // Explicit name .Suffix("readonly") // Add suffix to auto-name .Where(filterFn) // Filter events .Last() // Keep last per subject (default) .First() // Keep first per subject .Count() // Count events .Top("field", 10) // Top N by field .Bottom("field", 10) // Bottom N by field .Sum("field") // Sum numeric field .Avg("field") // Average numeric field .GroupBy("field") // Group by field .Merge() // Merge all fields .Build(ctx) // → *View (cached) .Open(ctx) // → *Cursor (streaming)Using Scan
Section titled “Using Scan”The unified gravity.Scan function works with both *View and *Cursor:
// Cached view - Build() creates materialized view (~1µs per iteration)view, _ := grav.From("events").Subject("user.>").Build(ctx)err := gravity.Scan(ctx, view, func(user User) error { fmt.Printf("Processing: %s\n", user.Name) return nil})
// Streaming cursor - Open() streams with O(1) memorycursor, _ := grav.From("events").Subject("user.>").Open(ctx)err := gravity.Scan(ctx, cursor, func(user User) error { return nil})When to Use Each Pattern
Section titled “When to Use Each Pattern”| Method | Returns | Memory | Speed | Use Case |
|---|---|---|---|---|
Build(ctx) | *View | O(n) | ~70µs/100 items | Hot paths, repeated queries |
Open(ctx) | *Cursor | O(1) | ~1.3ms/100 items | Ad-hoc queries, unknown size |
Decision guide:
- < 10 queries → use
Open()(cursor) - > 30 queries → use
Build()(view) - Hot path / continuous → always
Build() - One-off / ad-hoc → always
Open()
Live Updates
Section titled “Live Updates”Views are live, not snapshots. After Build() returns, the consumer continues running in the background:
// Build once at startupleaderboard, _ := grav.From("game").Subject("scores.>").Top("score", 100).Build(ctx)
// Consumer keeps running, updating cache as new scores arrivefor { gravity.Scan(ctx, leaderboard, func(s Score) error { // Always the latest top 100 - updated in real-time }) time.Sleep(time.Second)}Memory Safety
Section titled “Memory Safety”Safe for Build() (bounded data):
// ✅ Top N - bounded to 100 entries maxgrav.From("game").Subject("scores.>").Top("score", 100).Build(ctx)
// ✅ Config/settings - small, rarely changesgrav.From("config").Subject("settings.>").Build(ctx)
// ✅ With TTL - entries expire naturallygrav.From("sessions").Subject("active.>").TTL(time.Hour).Build(ctx)Use Open() (unbounded/unknown):
// ✅ History queries - unknown sizecursor, _ := grav.From("events").Subject("user.123.history.>").Open(ctx)
// ✅ Reports/exports - potentially largecursor, _ := grav.From("orders").Subject("orders.2024.>").Open(ctx)Rule: Build() is safe when you control the upper bound via reducers (Top, filters) or domain constraints. If you can’t guarantee bounds → Open().
Common Patterns
Section titled “Common Patterns”Leaderboard (Top 10)
Section titled “Leaderboard (Top 10)”view, _ := grav.From("game"). Subject("game.scores.>"). As("leaderboard"). Top("score", 10). Build(ctx)
err := gravity.Scan(ctx, view, func(score Score) error { fmt.Printf("%s: %.0f\n", score.Player, score.Value) return nil})Filtered View
Section titled “Filtered View”view, _ := grav.From("users"). Subject("users.profiles.>"). Where(func(e *gravity.Event) bool { status, _ := e.String("status") return status == "active" }). Build(ctx)Aggregations
Section titled “Aggregations”// Sumview, _ := grav.From("orders").Subject("orders.>").Sum("amount").Build(ctx)
// Countview, _ := grav.From("events").Subject("events.>").Count().Build(ctx)
// Averageview, _ := grav.From("ratings").Subject("ratings.>").Avg("score").Build(ctx)Event Helpers
Section titled “Event Helpers”Use helper methods in filter functions:
Where(func(e *gravity.Event) bool { str, _ := e.String("field") num, _ := e.Float("score") n, _ := e.Int("count") b, _ := e.Bool("active") return num > 100 && b})View Naming
Section titled “View Naming”Views are named with format: v_[namespace_]<hash>[_suffix]
// Auto-generatedview, _ := grav.From("events").Subject("user.>").Build(ctx)// → v_abc123def456
// With namespacegrav, _ := gravity.NewManager(js, gravity.WithNamespace("dreamz"))// → v_dreamz_abc123def456
// With suffixview, _ := grav.From("events").Subject("user.>").Suffix("readonly").Build(ctx)// → v_abc123def456_readonly
// Explicit nameview, _ := grav.From("events").Subject("user.>").As("my_view").Build(ctx)// → my_viewPerformance
Section titled “Performance”Benchmarks on Apple M2 (run with go test -bench=. ./internal/infra/gravity/...):
| Operation | 100 items | 1,000 items | 10,000 items |
|---|---|---|---|
| View.Scan | 70µs | 700µs | 7ms |
| Cursor.Scan | 1.3ms | 4ms | 30ms |
| View.Build | 13ms | 73ms | 696ms |
Key insights:
- View.Scan is 4-20x faster than Cursor.Scan
- Build() is expensive - one-time cost, only worth it for repeated queries
- Break-even: Build() cost ≈ 10-30 Cursor scans
| Scenario | NATS Fetch | Gravity (cached) |
|---|---|---|
| Filter 10K messages | 10-50 sec | ~7ms |
| Top 10 from 10K messages | 10-50 sec | ~70µs |
| Repeated queries | 10-50 sec each | ~70µs |
Design Principles
Section titled “Design Principles”- Synchronous -
Build()blocks until view is ready - Cached views - Views are always cached (that’s their purpose)
- Streaming queries - Builder queries stream by default (O(1) memory)
- Auto-sync - Views update automatically as events arrive
- Thread-safe - Manager and View are safe for concurrent use
When to Use Gravity
Section titled “When to Use Gravity”Use Gravity when:
- You need to query/filter by message content
- You need aggregations (Top N, Count, Sum, Avg)
- You’ll run the same query multiple times
- You need fast reads (<10ms)
Use NATS directly when:
- You know the exact sequence number
- You’re consuming messages sequentially
- Single-subject last message - use
stream.GetLastMsgForSubject()