Skip to content

Commit cd388c2

Browse files
committed
denormalized tables + view tables
1 parent c11d820 commit cd388c2

37 files changed

+1043
-33
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/weaver-index/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ humansize = "2.0"
6868
base64 = "0.22"
6969
dashmap = "6"
7070
include_dir = "0.7.4"
71+
regex = "1"
7172

7273
# WebSocket (for tap consumer)
7374
tokio-tungstenite = { version = "0.26", features = ["native-tls"] }

crates/weaver-index/migrations/clickhouse/001_raw_records.sql

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
-- Raw records from firehose/jetstream
22
-- Core table for all AT Protocol records before denormalization
33
--
4-
-- Uses ReplacingMergeTree to deduplicate on (collection, did, rkey) keeping latest indexed_at
4+
-- Append-only log using plain MergeTree - all versions preserved for audit/rollback.
5+
-- Query-time deduplication via ORDER BY + LIMIT or window functions.
56
-- JSON column stores full record, extract fields only when needed for ORDER BY/WHERE/JOINs
67

78
CREATE TABLE IF NOT EXISTS raw_records (
@@ -10,16 +11,16 @@ CREATE TABLE IF NOT EXISTS raw_records (
1011
collection LowCardinality(String),
1112
rkey String,
1213

13-
-- Content identifier from the record
14+
-- Content identifier from the record (content-addressed hash)
1415
cid String,
1516

16-
-- Repository revision (TID) - monotonically increasing per DID, used for dedup/ordering
17+
-- Repository revision (TID) - monotonically increasing per DID, used for ordering
1718
rev String,
1819

1920
-- Full record as native JSON (schema-flexible, queryable with record.field.subfield)
2021
record JSON,
2122

22-
-- Operation: 'create', 'update', 'delete'
23+
-- Operation: 'create', 'update', 'delete', 'cache' (fetched on-demand)
2324
operation LowCardinality(String),
2425

2526
-- Firehose sequence number (metadata only, not for ordering - can jump on relay restart)
@@ -49,6 +50,5 @@ CREATE TABLE IF NOT EXISTS raw_records (
4950
SELECT * ORDER BY (did, cid)
5051
)
5152
)
52-
ENGINE = ReplacingMergeTree(indexed_at)
53-
ORDER BY (collection, did, rkey, event_time, indexed_at)
54-
SETTINGS deduplicate_merge_projection_mode = 'drop';
53+
ENGINE = MergeTree()
54+
ORDER BY (collection, did, rkey, event_time, indexed_at);
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
-- Handle → DID mappings with account status tracking
2+
--
3+
-- Updated from three sources:
4+
-- 1. Identity events (firehose) - handle claims/changes via 009_handle_mappings_identity_mv.sql
5+
-- 2. Account events (firehose) - takedowns/suspensions/deletions via 010_handle_mappings_account_mv.sql
6+
-- 3. Resolution cache flush - XRPC handle resolution results (manual inserts)
7+
--
8+
-- Query pattern: ORDER BY freed ASC, event_time DESC to get active mapping first
9+
10+
CREATE TABLE IF NOT EXISTS handle_mappings (
11+
handle String,
12+
did String,
13+
14+
-- 0 = active, 1 = account deactivated/suspended/deleted
15+
freed UInt8 DEFAULT 0,
16+
17+
-- 'active' | 'takendown' | 'suspended' | 'deleted' | 'deactivated'
18+
account_status LowCardinality(String) DEFAULT 'active',
19+
20+
-- 'identity' (firehose) | 'account' (firehose) | 'resolution' (xrpc cache)
21+
source LowCardinality(String),
22+
23+
-- Canonical event time (firehose event or resolution time)
24+
event_time DateTime64(3),
25+
26+
-- When we indexed this mapping
27+
indexed_at DateTime64(3) DEFAULT now64(3),
28+
29+
-- Fast DID → handle lookups (for account events, profile hydration)
30+
-- Query with ORDER BY freed ASC, event_time DESC to get active mapping
31+
PROJECTION by_did (
32+
SELECT * ORDER BY (did, freed, event_time)
33+
)
34+
)
35+
ENGINE = ReplacingMergeTree(indexed_at)
36+
ORDER BY (handle, did)
37+
SETTINGS deduplicate_merge_projection_mode = 'drop';
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
-- Auto-populate handle_mappings from identity events when handle is present
2+
3+
CREATE MATERIALIZED VIEW IF NOT EXISTS handle_mappings_from_identity_mv TO handle_mappings AS
4+
SELECT
5+
handle,
6+
did,
7+
0 as freed,
8+
'active' as account_status,
9+
'identity' as source,
10+
event_time,
11+
now64(3) as indexed_at
12+
FROM raw_identity_events
13+
WHERE handle != ''
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
-- Auto-populate freed status from account events
2+
-- JOINs against handle_mappings to find current handle for the DID
3+
-- If no mapping exists yet, the JOIN fails silently (can't free unknown handles)
4+
5+
CREATE MATERIALIZED VIEW IF NOT EXISTS handle_mappings_from_account_mv TO handle_mappings AS
6+
SELECT
7+
h.handle,
8+
a.did,
9+
1 as freed,
10+
a.status as account_status,
11+
'account' as source,
12+
a.event_time,
13+
now64(3) as indexed_at
14+
FROM raw_account_events a
15+
INNER JOIN handle_mappings h ON h.did = a.did AND h.freed = 0
16+
WHERE a.active = 0 AND a.status != ''
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
-- Weaver profile source table
2+
-- Populated by MV from raw_records, merged into profiles by refreshable MV
3+
4+
CREATE TABLE IF NOT EXISTS profiles_weaver (
5+
did String,
6+
7+
-- Raw profile JSON
8+
profile String,
9+
10+
-- Extracted fields for coalescing
11+
display_name String DEFAULT '',
12+
description String DEFAULT '',
13+
avatar_cid String DEFAULT '',
14+
banner_cid String DEFAULT '',
15+
16+
-- Timestamps
17+
created_at DateTime64(3) DEFAULT toDateTime64(0, 3),
18+
event_time DateTime64(3),
19+
indexed_at DateTime64(3) DEFAULT now64(3),
20+
21+
-- Soft delete (epoch = not deleted)
22+
deleted_at DateTime64(3) DEFAULT toDateTime64(0, 3)
23+
)
24+
ENGINE = ReplacingMergeTree(indexed_at)
25+
ORDER BY did
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
-- Populate profiles_weaver from raw_records
2+
3+
CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_weaver_mv TO profiles_weaver AS
4+
SELECT
5+
did,
6+
toString(record) as profile,
7+
coalesce(record.displayName, '') as display_name,
8+
coalesce(record.description, '') as description,
9+
coalesce(record.avatar.ref.`$link`, '') as avatar_cid,
10+
coalesce(record.banner.ref.`$link`, '') as banner_cid,
11+
coalesce(toDateTime64(record.createdAt, 3), toDateTime64(0, 3)) as created_at,
12+
event_time,
13+
now64(3) as indexed_at,
14+
if(operation = 'delete', event_time, toDateTime64(0, 3)) as deleted_at
15+
FROM raw_records
16+
WHERE collection = 'sh.weaver.actor.profile'
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
-- Bluesky profile source table
2+
-- Populated by MV from raw_records, merged into profiles by refreshable MV
3+
4+
CREATE TABLE IF NOT EXISTS profiles_bsky (
5+
did String,
6+
7+
-- Raw profile JSON
8+
profile String,
9+
10+
-- Extracted fields for coalescing
11+
display_name String DEFAULT '',
12+
description String DEFAULT '',
13+
avatar_cid String DEFAULT '',
14+
banner_cid String DEFAULT '',
15+
16+
-- Timestamps
17+
created_at DateTime64(3) DEFAULT toDateTime64(0, 3),
18+
event_time DateTime64(3),
19+
indexed_at DateTime64(3) DEFAULT now64(3),
20+
21+
-- Soft delete (epoch = not deleted)
22+
deleted_at DateTime64(3) DEFAULT toDateTime64(0, 3)
23+
)
24+
ENGINE = ReplacingMergeTree(indexed_at)
25+
ORDER BY did
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
-- Populate profiles_bsky from raw_records
2+
3+
CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_bsky_mv TO profiles_bsky AS
4+
SELECT
5+
did,
6+
toString(record) as profile,
7+
coalesce(record.displayName, '') as display_name,
8+
coalesce(record.description, '') as description,
9+
coalesce(record.avatar.ref.`$link`, '') as avatar_cid,
10+
coalesce(record.banner.ref.`$link`, '') as banner_cid,
11+
coalesce(toDateTime64(record.createdAt, 3), toDateTime64(0, 3)) as created_at,
12+
event_time,
13+
now64(3) as indexed_at,
14+
if(operation = 'delete', event_time, toDateTime64(0, 3)) as deleted_at
15+
FROM raw_records
16+
WHERE collection = 'app.bsky.actor.profile'

0 commit comments

Comments
 (0)