Skip to content

Commit 9013dc4

Browse files
committed
Udpate docs for kafka loader and fix query
1 parent 216448f commit 9013dc4

File tree

2 files changed

+85
-36
lines changed

2 files changed

+85
-36
lines changed

apps/kafka_streaming_loader_guide.md

Lines changed: 77 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,36 @@ Stream blockchain data to Kafka topics in real-time.
66

77
```bash
88
uv run python apps/kafka_streaming_loader.py \
9-
--topic anvil_logs \
10-
--query-file apps/queries/anvil_logs.sql \
11-
--raw-dataset anvil \
12-
--start-block 0
9+
--amp-server 'grpc+tls://gateway.amp.staging.thegraph.com:443' \
10+
--kafka-brokers localhost:9092 \
11+
--topic erc20_transfers \
12+
--query-file apps/queries/erc20_transfers_activity.sql \
13+
--raw-dataset 'edgeandnode/ethereum_mainnet' \
14+
--network ethereum-mainnet
1315
```
1416

1517
## Basic Usage
1618

17-
### Minimal Example
19+
### Minimal Example (Staging Gateway)
1820

1921
```bash
2022
uv run python apps/kafka_streaming_loader.py \
23+
--amp-server 'grpc+tls://gateway.amp.staging.thegraph.com:443' \
24+
--kafka-brokers localhost:9092 \
2125
--topic my_topic \
2226
--query-file my_query.sql \
23-
--raw-dataset eth_firehose
27+
--raw-dataset 'edgeandnode/ethereum_mainnet' \
28+
--network ethereum-mainnet
2429
```
2530

26-
### With Common Options
31+
### Local Development (Anvil)
2732

2833
```bash
2934
uv run python apps/kafka_streaming_loader.py \
30-
--kafka-brokers localhost:9092 \
31-
--topic erc20_transfers \
32-
--query-file apps/queries/erc20_transfers.sql \
33-
--raw-dataset eth_firehose \
34-
--start-block 19000000
35+
--topic anvil_logs \
36+
--query-file apps/queries/anvil_logs.sql \
37+
--raw-dataset anvil \
38+
--start-block 0
3539
```
3640

3741
## Configuration Options
@@ -42,17 +46,17 @@ uv run python apps/kafka_streaming_loader.py \
4246
|----------|-------------|
4347
| `--topic NAME` | Kafka topic name |
4448
| `--query-file PATH` | Path to SQL query file |
45-
| `--raw-dataset NAME` | Dataset name (e.g., `eth_firehose`, `anvil`) |
49+
| `--raw-dataset NAME` | Dataset name (e.g., `edgeandnode/ethereum_mainnet`, `anvil`) |
4650

4751
### Optional Arguments
4852

4953
| Argument | Default | Description |
5054
|----------|---------|-------------|
55+
| `--amp-server URL` | `grpc://127.0.0.1:1602` | AMP server URL (use `grpc+tls://gateway.amp.staging.thegraph.com:443` for staging) |
5156
| `--kafka-brokers` | `localhost:9092` | Kafka broker addresses |
57+
| `--network NAME` | `anvil` | Network identifier (e.g., `ethereum-mainnet`, `anvil`) |
5258
| `--start-block N` | Latest block | Start streaming from this block |
53-
| `--network NAME` | `anvil` | Network identifier |
5459
| `--label-csv PATH` | - | CSV file for data enrichment |
55-
| `--amp-server URL` | `grpc://127.0.0.1:1602` | AMP server URL |
5660

5761
## Message Format
5862

@@ -87,34 +91,77 @@ Consumers should invalidate data in the specified block range.
8791

8892
## Examples
8993

90-
### Stream Anvil Logs
91-
92-
```bash
93-
uv run python apps/kafka_streaming_loader.py \
94-
--topic anvil_logs \
95-
--query-file apps/queries/anvil_logs.sql \
96-
--raw-dataset anvil \
97-
--start-block 0
98-
```
99-
10094
### Stream ERC20 Transfers
10195

96+
Stream ERC20 transfer events with the activity schema:
97+
10298
```bash
10399
uv run python apps/kafka_streaming_loader.py \
100+
--amp-server 'grpc+tls://gateway.amp.staging.thegraph.com:443' \
101+
--kafka-brokers localhost:9092 \
104102
--topic erc20_transfers \
105-
--query-file apps/queries/erc20_transfers.sql \
106-
--raw-dataset eth_firehose \
107-
--start-block 19000000 \
108-
--label-csv data/eth_mainnet_token_metadata.csv
103+
--query-file apps/queries/erc20_transfers_activity.sql \
104+
--raw-dataset 'edgeandnode/ethereum_mainnet' \
105+
--network ethereum-mainnet
109106
```
110107

108+
#### Token Metadata Enrichment (Optional)
109+
110+
To enrich transfer events with token metadata (symbol, name, decimals), add a CSV file:
111+
112+
1. **Obtain the token metadata CSV**:
113+
- Download from your token metadata source
114+
- Or export from a database with token information
115+
- Required columns: `token_address`, `symbol`, `name`, `decimals`
116+
117+
2. **Place the CSV file** in the `data/` directory:
118+
```bash
119+
mkdir -p data
120+
# Copy your CSV file
121+
cp /path/to/your/tokens.csv data/eth_mainnet_token_metadata.csv
122+
```
123+
124+
3. **Run with label enrichment**:
125+
```bash
126+
uv run python apps/kafka_streaming_loader.py \
127+
--amp-server 'grpc+tls://gateway.amp.staging.thegraph.com:443' \
128+
--kafka-brokers localhost:9092 \
129+
--topic erc20_transfers \
130+
--query-file apps/queries/erc20_transfers_activity.sql \
131+
--raw-dataset 'edgeandnode/ethereum_mainnet' \
132+
--network ethereum-mainnet \
133+
--label-csv data/eth_mainnet_token_metadata.csv
134+
```
135+
136+
**CSV Format Example**:
137+
```csv
138+
token_address,symbol,name,decimals
139+
0xe0f066cb646256d33cae9a32c7b144ccbd248fdd,gg unluck,gg unluck,18
140+
0xabb2a7bec4604491e85a959177cc0e95f60c6bd5,RTX,Remittix,3
141+
```
142+
143+
Without the CSV file, `token_symbol`, `token_name`, and `token_decimals` will be `null` in the output.
144+
111145
### Stream from Latest Blocks
112146

113147
```bash
114148
uv run python apps/kafka_streaming_loader.py \
149+
--amp-server 'grpc+tls://gateway.amp.staging.thegraph.com:443' \
150+
--kafka-brokers localhost:9092 \
115151
--topic eth_live_logs \
116152
--query-file apps/queries/all_logs.sql \
117-
--raw-dataset eth_firehose
153+
--raw-dataset 'edgeandnode/ethereum_mainnet' \
154+
--network ethereum-mainnet
155+
```
156+
157+
### Local Development (Anvil)
158+
159+
```bash
160+
uv run python apps/kafka_streaming_loader.py \
161+
--topic anvil_logs \
162+
--query-file apps/queries/anvil_logs.sql \
163+
--raw-dataset anvil \
164+
--start-block 0
118165
```
119166

120167
## Consuming Messages

apps/queries/erc20_transfers_activity.sql

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
-- - token_address: Binary address of the ERC20 token contract
2626
--
2727
-- Example usage:
28-
-- uv apps/kafka_streaming_loader.py \
29-
-- --query-file apps/queries/erc20_transfers_activity.sql \
28+
-- uv run python apps/kafka_streaming_loader.py \
29+
-- --amp-server 'grpc+tls://gateway.amp.staging.thegraph.com:443' \
30+
-- --kafka-brokers localhost:9092 \
3031
-- --topic erc20_transfers \
31-
-- --label-csv data/eth_mainnet_token_metadata.csv \
32-
-- --raw-dataset eth_firehose \
33-
-- --network ethereum
32+
-- --query-file apps/queries/erc20_transfers_activity.sql \
33+
-- --raw-dataset 'edgeandnode/ethereum_mainnet' \
34+
-- --network ethereum-mainnet \
35+
-- --label-csv data/eth_mainnet_token_metadata.csv
3436

3537
select
3638
1.0 as version,
@@ -61,7 +63,7 @@ select
6163
cast(null as string) as token_symbol,
6264
cast(null as string) as token_name,
6365
cast(null as int) as token_decimals
64-
from eth_firehose.logs l
66+
from "edgeandnode/ethereum_mainnet".logs l
6567
where
6668
l.topic0 = evm_topic('Transfer(address indexed from, address indexed to, uint256 value)') and
6769
l.topic3 IS NULL

0 commit comments

Comments
 (0)