Skip to content

Commit 9fae373

Browse files
committed
feat(run): added ---dry-run --from/--to engine & predicates
1 parent 79f7551 commit 9fae373

File tree

6 files changed

+346
-14
lines changed

6 files changed

+346
-14
lines changed

internal/config/config.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,18 @@ type Dedupe struct {
5151
TTL string `yaml:"ttl"`
5252
}
5353

54+
type RateLimit struct {
55+
Capacity float64 `yaml:"capacity"` // max tokens
56+
Rate float64 `yaml:"rate"` // tokens per second
57+
}
58+
5459
type Rule struct {
55-
ID string `yaml:"id"`
56-
Source string `yaml:"source"`
57-
Match MatchSpec `yaml:"match"`
58-
Sinks []string `yaml:"sinks"`
59-
Dedupe *Dedupe `yaml:"dedupe,omitempty"`
60+
ID string `yaml:"id"`
61+
Source string `yaml:"source"`
62+
Match MatchSpec `yaml:"match"`
63+
Sinks []string `yaml:"sinks"`
64+
Dedupe *Dedupe `yaml:"dedupe,omitempty"`
65+
RateLimit *RateLimit `yaml:"rate_limit,omitempty"`
6066
}
6167

6268
type Sink struct {

internal/engine/predicate.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func compile(expr string) (Predicate, error) {
101101
field := strings.TrimSpace(parts[0])
102102
rhsRaw := strings.TrimSpace(parts[1])
103103

104-
numRHS, rhsIsNum := parseNumber(rhsRaw)
104+
numRHS, rhsIsNum := evaluateNumber(rhsRaw)
105105

106106
return func(args map[string]any) (bool, error) {
107107
val, ok := args[field]
@@ -143,12 +143,56 @@ func compile(expr string) (Predicate, error) {
143143
}, nil
144144
}
145145

146-
func parseNumber(s string) (float64, bool) {
146+
// evaluateNumber evaluates a numeric expression, supporting:
147+
// - Simple numbers: "100", "1e6", "1_000_000"
148+
// - Helper functions: "wei(1e18)", "microAlgos(1e6)"
149+
// - Multiplication: "1_000_000 * 1e6"
150+
func evaluateNumber(s string) (float64, bool) {
151+
s = strings.TrimSpace(s)
147152
s = strings.ReplaceAll(s, "_", "")
153+
154+
// Handle multiplication
155+
if strings.Contains(s, "*") {
156+
parts := strings.Split(s, "*")
157+
if len(parts) != 2 {
158+
return 0, false
159+
}
160+
a, ok1 := evaluateNumber(strings.TrimSpace(parts[0]))
161+
b, ok2 := evaluateNumber(strings.TrimSpace(parts[1]))
162+
if !ok1 || !ok2 {
163+
return 0, false
164+
}
165+
return a * b, true
166+
}
167+
168+
// Check for helper functions: wei(value) or microAlgos(value)
169+
if strings.HasPrefix(s, "wei(") && strings.HasSuffix(s, ")") {
170+
inner := strings.TrimSpace(s[4 : len(s)-1])
171+
v, ok := evaluateNumber(inner)
172+
if !ok {
173+
return 0, false
174+
}
175+
return v, true // wei is already the base unit, no conversion needed
176+
}
177+
if strings.HasPrefix(s, "microAlgos(") && strings.HasSuffix(s, ")") {
178+
inner := strings.TrimSpace(s[11 : len(s)-1])
179+
v, ok := evaluateNumber(inner)
180+
if !ok {
181+
return 0, false
182+
}
183+
return v, true // microAlgos is already the base unit, no conversion needed
184+
}
185+
186+
// Parse as a simple number
148187
v, err := strconv.ParseFloat(s, 64)
149188
return v, err == nil
150189
}
151190

191+
// parseNumber is a simple wrapper for backward compatibility.
192+
func parseNumber(s string) (float64, bool) {
193+
return evaluateNumber(s)
194+
}
195+
152196
func toNumber(v any) (float64, bool) {
153197
switch n := v.(type) {
154198
case int:

internal/engine/predicate_test.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,139 @@ func TestCompilePredicates_StringEquality(t *testing.T) {
5151
}
5252
}
5353

54+
func TestCompilePredicates_TableDriven(t *testing.T) {
55+
tests := []struct {
56+
name string
57+
expr string
58+
args map[string]any
59+
want bool
60+
wantError bool
61+
}{
62+
// Numeric comparisons
63+
{"numeric_eq", "value == 10", map[string]any{"value": 10}, true, false},
64+
{"numeric_eq_fail", "value == 10", map[string]any{"value": 20}, false, false},
65+
{"numeric_ne", "value != 10", map[string]any{"value": 20}, true, false},
66+
{"numeric_ne_fail", "value != 10", map[string]any{"value": 10}, false, false},
67+
{"numeric_gt", "value > 10", map[string]any{"value": 15}, true, false},
68+
{"numeric_gt_fail", "value > 10", map[string]any{"value": 5}, false, false},
69+
{"numeric_lt", "value < 10", map[string]any{"value": 5}, true, false},
70+
{"numeric_lt_fail", "value < 10", map[string]any{"value": 15}, false, false},
71+
{"numeric_gte", "value >= 10", map[string]any{"value": 10}, true, false},
72+
{"numeric_gte_above", "value >= 10", map[string]any{"value": 15}, true, false},
73+
{"numeric_gte_fail", "value >= 10", map[string]any{"value": 5}, false, false},
74+
{"numeric_lte", "value <= 10", map[string]any{"value": 10}, true, false},
75+
{"numeric_lte_below", "value <= 10", map[string]any{"value": 5}, true, false},
76+
{"numeric_lte_fail", "value <= 10", map[string]any{"value": 15}, false, false},
77+
78+
// String comparisons
79+
{"string_eq", "status == ok", map[string]any{"status": "ok"}, true, false},
80+
{"string_eq_fail", "status == ok", map[string]any{"status": "fail"}, false, false},
81+
{"string_ne", "status != ok", map[string]any{"status": "fail"}, true, false},
82+
{"string_ne_fail", "status != ok", map[string]any{"status": "ok"}, false, false},
83+
84+
// In operator
85+
{"in_match", "sender in a,b,c", map[string]any{"sender": "b"}, true, false},
86+
{"in_match_first", "sender in a,b,c", map[string]any{"sender": "a"}, true, false},
87+
{"in_match_last", "sender in a,b,c", map[string]any{"sender": "c"}, true, false},
88+
{"in_no_match", "sender in a,b,c", map[string]any{"sender": "d"}, false, false},
89+
{"in_missing_field", "sender in a,b,c", map[string]any{"other": "a"}, false, false},
90+
91+
// Contains operator
92+
{"contains_match", "memo contains alert", map[string]any{"memo": "critical alert raised"}, true, false},
93+
{"contains_no_match", "memo contains alert", map[string]any{"memo": "normal message"}, false, false},
94+
{"contains_missing_field", "memo contains alert", map[string]any{"other": "alert"}, false, false},
95+
96+
// Numeric helpers and expressions
97+
{"wei_helper", "value >= wei(1000)", map[string]any{"value": 1000}, true, false},
98+
{"wei_helper_fail", "value >= wei(1000)", map[string]any{"value": 500}, false, false},
99+
{"microAlgos_helper", "amount >= microAlgos(1000000)", map[string]any{"amount": 1000000}, true, false},
100+
{"multiplication", "value >= 1_000_000 * 1e6", map[string]any{"value": 1e12}, true, false},
101+
{"multiplication_fail", "value >= 1_000_000 * 1e6", map[string]any{"value": 1e11}, false, false},
102+
{"scientific_notation", "value >= 1e6", map[string]any{"value": 1e6}, true, false},
103+
{"underscore_separators", "value >= 1_000_000", map[string]any{"value": 1000000}, true, false},
104+
105+
// Type conversions
106+
{"int64_value", "value > 10", map[string]any{"value": int64(15)}, true, false},
107+
{"uint64_value", "value > 10", map[string]any{"value": uint64(15)}, true, false},
108+
{"float64_value", "value > 10", map[string]any{"value": 15.5}, true, false},
109+
{"string_number", "value > 10", map[string]any{"value": "15"}, true, false},
110+
111+
// Missing fields
112+
{"missing_field_numeric", "value > 10", map[string]any{"other": 15}, false, false},
113+
{"missing_field_string", "status == ok", map[string]any{"other": "ok"}, false, false},
114+
115+
// Invalid operators (should error on compile)
116+
{"invalid_op", "value ** 2", map[string]any{"value": 4}, false, true},
117+
}
118+
119+
for _, tt := range tests {
120+
t.Run(tt.name, func(t *testing.T) {
121+
preds, err := CompilePredicates([]string{tt.expr})
122+
if tt.wantError {
123+
if err == nil {
124+
t.Fatalf("expected compile error, got none")
125+
}
126+
return
127+
}
128+
if err != nil {
129+
t.Fatalf("unexpected compile error: %v", err)
130+
}
131+
if len(preds) != 1 {
132+
t.Fatalf("expected 1 predicate, got %d", len(preds))
133+
}
134+
135+
got, err := preds[0](tt.args)
136+
if err != nil {
137+
t.Fatalf("unexpected eval error: %v", err)
138+
}
139+
if got != tt.want {
140+
t.Errorf("predicate(%q) with args %v = %v, want %v", tt.expr, tt.args, got, tt.want)
141+
}
142+
})
143+
}
144+
}
145+
146+
func TestCompilePredicates_MultiplePredicates(t *testing.T) {
147+
tests := []struct {
148+
name string
149+
exprs []string
150+
args map[string]any
151+
want bool
152+
}{
153+
{"all_pass", []string{"value > 10", "value < 20", "status == ok"}, map[string]any{"value": 15, "status": "ok"}, true},
154+
{"first_fails", []string{"value > 10", "value < 20"}, map[string]any{"value": 5}, false},
155+
{"second_fails", []string{"value > 10", "value < 20"}, map[string]any{"value": 25}, false},
156+
{"mixed_types", []string{"value > 10", "sender in a,b,c", "memo contains test"}, map[string]any{"value": 15, "sender": "b", "memo": "test message"}, true},
157+
{"empty_exprs", []string{}, map[string]any{"value": 15}, true},
158+
{"whitespace_exprs", []string{" ", "value > 10", ""}, map[string]any{"value": 15}, true},
159+
}
160+
161+
for _, tt := range tests {
162+
t.Run(tt.name, func(t *testing.T) {
163+
preds, err := CompilePredicates(tt.exprs)
164+
if err != nil {
165+
t.Fatalf("unexpected compile error: %v", err)
166+
}
167+
168+
got := true
169+
for _, p := range preds {
170+
ok, err := p(tt.args)
171+
if err != nil {
172+
t.Fatalf("unexpected eval error: %v", err)
173+
}
174+
if !ok {
175+
got = false
176+
break
177+
}
178+
}
179+
180+
if got != tt.want {
181+
t.Errorf("predicates %v with args %v = %v, want %v", tt.exprs, tt.args, got, tt.want)
182+
}
183+
})
184+
}
185+
}
186+
54187
func TestTokenBucket(t *testing.T) {
55188
tb := NewTokenBucket(2, 1) // capacity=2, 1 token/sec
56189
now := time.Now()
@@ -68,3 +201,79 @@ func TestTokenBucket(t *testing.T) {
68201
t.Fatalf("expected token after refill")
69202
}
70203
}
204+
205+
func TestTokenBucket_TableDriven(t *testing.T) {
206+
tests := []struct {
207+
name string
208+
capacity float64
209+
rate float64
210+
actions []struct {
211+
elapsed time.Duration
212+
want bool
213+
}
214+
}{
215+
{
216+
name: "basic_rate_limit",
217+
capacity: 2,
218+
rate: 1,
219+
actions: []struct {
220+
elapsed time.Duration
221+
want bool
222+
}{
223+
{0, true}, // first token
224+
{0, true}, // second token
225+
{0, false}, // rate limited
226+
{1500 * time.Millisecond, true}, // refilled ~1.5 tokens, consume 1 (0.5 remaining)
227+
{500 * time.Millisecond, true}, // refilled 0.5 tokens (now 1.0 total), can consume
228+
{0, false}, // rate limited again
229+
{1000 * time.Millisecond, true}, // refilled 1 token
230+
},
231+
},
232+
{
233+
name: "high_capacity",
234+
capacity: 10,
235+
rate: 2,
236+
actions: []struct {
237+
elapsed time.Duration
238+
want bool
239+
}{
240+
{0, true}, // consume 1
241+
{0, true}, // consume 2
242+
{0, true}, // consume 3
243+
{1 * time.Second, true}, // refilled 2 tokens (now at 9)
244+
{1 * time.Second, true}, // refilled 2 more (now at 10, capped)
245+
},
246+
},
247+
{
248+
name: "slow_refill",
249+
capacity: 1,
250+
rate: 0.5, // 1 token per 2 seconds
251+
actions: []struct {
252+
elapsed time.Duration
253+
want bool
254+
}{
255+
{0, true}, // consume initial token
256+
{0, false}, // rate limited
257+
{1 * time.Second, false}, // only 0.5 tokens refilled
258+
{2 * time.Second, true}, // 1 token refilled
259+
},
260+
},
261+
}
262+
263+
for _, tt := range tests {
264+
t.Run(tt.name, func(t *testing.T) {
265+
tb := NewTokenBucket(tt.capacity, tt.rate)
266+
now := time.Now()
267+
268+
for i, action := range tt.actions {
269+
if i > 0 {
270+
now = now.Add(action.elapsed)
271+
}
272+
got := tb.Allow(now)
273+
if got != action.want {
274+
t.Errorf("action %d: Allow() = %v, want %v (elapsed: %v)", i, got, action.want, action.elapsed)
275+
}
276+
}
277+
})
278+
}
279+
}

internal/engine/runner.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ type Event struct {
3939
}
4040

4141
type ruleExec struct {
42-
rule config.Rule
43-
preds []Predicate
44-
ttl time.Duration
42+
rule config.Rule
43+
preds []Predicate
44+
ttl time.Duration
45+
rateLimit *TokenBucket
4546
}
4647

4748
// NewRunner builds a runner for the provided config and scanners.
@@ -58,7 +59,11 @@ func NewRunner(store *storage.Store, cfg *config.Config, evmScanners map[string]
5859
ttl = d
5960
}
6061
}
61-
rules[r.ID] = ruleExec{rule: r, preds: preds, ttl: ttl}
62+
var rateLimit *TokenBucket
63+
if r.RateLimit != nil {
64+
rateLimit = NewTokenBucket(r.RateLimit.Capacity, r.RateLimit.Rate)
65+
}
66+
rules[r.ID] = ruleExec{rule: r, preds: preds, ttl: ttl, rateLimit: rateLimit}
6267
}
6368

6469
return &Runner{
@@ -165,9 +170,17 @@ func (r *Runner) handleEvents(ctx context.Context, events []Event) error {
165170
// No side effects in dry-run: skip dedupe and sends.
166171
continue
167172
}
173+
now := r.nowFunc()
174+
175+
// Check rate limit if configured
176+
if exec.rateLimit != nil {
177+
if !exec.rateLimit.Allow(now) {
178+
continue // Rate limited, skip this alert
179+
}
180+
}
181+
168182
if exec.rule.Dedupe != nil {
169183
key := buildDedupeKey(exec.rule.Dedupe.Key, ev)
170-
now := r.nowFunc()
171184
isDup, err := r.store.IsDuplicate(ctx, key, now)
172185
if err != nil {
173186
return err

0 commit comments

Comments
 (0)