Skip to content

Commit 8fc21b1

Browse files
committed
docs
1 parent 08a0d40 commit 8fc21b1

File tree

1 file changed

+177
-0
lines changed

1 file changed

+177
-0
lines changed

docs/stream_processing.md

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
# Stream Processing
2+
3+
SQLMapper provides stream processing capabilities for handling large SQL files efficiently. This feature allows you to process SQL dumps in parallel, improving performance for large datasets.
4+
5+
## Overview
6+
7+
The stream processing feature:
8+
- Processes SQL statements in parallel
9+
- Uses worker pools for efficient resource utilization
10+
- Supports all major database types
11+
- Provides real-time callback for processed objects
12+
- Handles large SQL files without loading entire content into memory
13+
14+
## Usage
15+
16+
### Basic Stream Processing
17+
18+
```go
19+
parser := sqlmapper.NewStreamParser(sqlmapper.MySQL)
20+
21+
err := parser.ParseStream(sqlContent, func(obj interface{}) error {
22+
switch v := obj.(type) {
23+
case sqlmapper.Table:
24+
// Handle table
25+
case sqlmapper.View:
26+
// Handle view
27+
case sqlmapper.Function:
28+
// Handle function
29+
}
30+
return nil
31+
})
32+
```
33+
34+
### Parallel Stream Processing
35+
36+
```go
37+
parser := sqlmapper.NewStreamParser(sqlmapper.MySQL)
38+
39+
// Process with 4 workers
40+
err := parser.ParseStreamParallel(sqlContent, 4, func(obj interface{}) error {
41+
switch v := obj.(type) {
42+
case sqlmapper.Table:
43+
fmt.Printf("Processing table: %s\n", v.Name)
44+
case sqlmapper.View:
45+
fmt.Printf("Processing view: %s\n", v.Name)
46+
}
47+
return nil
48+
})
49+
```
50+
51+
## Configuration
52+
53+
### Worker Pool Size
54+
55+
The number of workers can be configured based on your system's capabilities:
56+
57+
```go
58+
// For CPU-bound tasks
59+
workers := runtime.NumCPU()
60+
61+
// For I/O-bound tasks
62+
workers := runtime.NumCPU() * 2
63+
```
64+
65+
### Supported Object Types
66+
67+
The stream processor can handle various SQL objects:
68+
69+
- Tables
70+
- Views
71+
- Functions
72+
- Procedures
73+
- Triggers
74+
- Indexes
75+
- Sequences
76+
77+
Each object is passed to the callback function as it's processed.
78+
79+
## Error Handling
80+
81+
Errors during stream processing are handled gracefully:
82+
83+
```go
84+
err := parser.ParseStreamParallel(sqlContent, 4, func(obj interface{}) error {
85+
if err := processObject(obj); err != nil {
86+
return fmt.Errorf("failed to process object: %v", err)
87+
}
88+
return nil
89+
})
90+
91+
if err != nil {
92+
log.Printf("Stream processing error: %v", err)
93+
}
94+
```
95+
96+
## Best Practices
97+
98+
1. **Worker Pool Size**
99+
- Start with number of CPU cores
100+
- Adjust based on performance monitoring
101+
- Consider memory constraints
102+
103+
2. **Error Handling**
104+
- Always check for errors in callback
105+
- Log errors appropriately
106+
- Consider implementing retry logic
107+
108+
3. **Memory Management**
109+
- Process objects as they come
110+
- Avoid storing large amounts of data
111+
- Use channels for communication
112+
113+
4. **Performance Optimization**
114+
- Monitor CPU and memory usage
115+
- Adjust worker count based on system load
116+
- Consider batch processing for small objects
117+
118+
## Examples
119+
120+
### Processing with Progress Tracking
121+
122+
```go
123+
var processed int64
124+
125+
err := parser.ParseStreamParallel(sqlContent, 4, func(obj interface{}) error {
126+
atomic.AddInt64(&processed, 1)
127+
fmt.Printf("\rProcessed objects: %d", processed)
128+
return nil
129+
})
130+
```
131+
132+
### Custom Object Processing
133+
134+
```go
135+
err := parser.ParseStreamParallel(sqlContent, 4, func(obj interface{}) error {
136+
switch v := obj.(type) {
137+
case sqlmapper.Table:
138+
return processTable(v)
139+
case sqlmapper.View:
140+
return processView(v)
141+
case sqlmapper.Function:
142+
return processFunction(v)
143+
default:
144+
return fmt.Errorf("unsupported object type")
145+
}
146+
})
147+
```
148+
149+
### Error Recovery
150+
151+
```go
152+
err := parser.ParseStreamParallel(sqlContent, 4, func(obj interface{}) error {
153+
for attempts := 0; attempts < 3; attempts++ {
154+
if err := processObject(obj); err != nil {
155+
if attempts == 2 {
156+
return err
157+
}
158+
continue
159+
}
160+
break
161+
}
162+
return nil
163+
})
164+
```
165+
166+
## Limitations
167+
168+
- Maximum file size depends on available system memory
169+
- Some complex SQL statements might require sequential processing
170+
- Performance depends on system resources and SQL complexity
171+
172+
## Performance Considerations
173+
174+
- Worker pool size affects memory usage
175+
- Large SQL files might require batching
176+
- Consider network I/O for database operations
177+
- Monitor system resources during processing

0 commit comments

Comments
 (0)