Skip to content

Commit 79f7551

Browse files
committed
feat(run): add logging, dry-run semantics, health stub
1 parent 5abd90d commit 79f7551

File tree

8 files changed

+197
-8
lines changed

8 files changed

+197
-8
lines changed

cmd/watch-tower/run.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/devblac/watch-tower/internal/source/algorand"
1111
"github.com/devblac/watch-tower/internal/source/evm"
1212
"github.com/devblac/watch-tower/internal/storage"
13+
"github.com/devblac/watch-tower/internal/logging"
1314
"github.com/spf13/cobra"
1415
)
1516

@@ -31,6 +32,7 @@ var runCmd = &cobra.Command{
3132
Use: "run",
3233
Short: "Run watch-tower pipelines",
3334
RunE: func(cmd *cobra.Command, args []string) error {
35+
log := logging.New()
3436
ctx := cmd.Context()
3537

3638
cfg, err := config.Load(cfgPath)
@@ -115,6 +117,7 @@ var runCmd = &cobra.Command{
115117
if err := runner.RunOnce(ctx); err != nil {
116118
return err
117119
}
120+
log.Info("tick complete", "dry_run", flagDryRun)
118121
if flagOnce {
119122
break
120123
}

internal/engine/predicate.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ type Predicate func(args map[string]any) (bool, error)
1313
// CompilePredicates parses simple expressions into executable predicates.
1414
// Supported operators: ==, !=, >, <, in, contains.
1515
// Examples:
16-
// "value > 10"
17-
// "sender in a,b,c"
18-
// "memo contains alert"
16+
//
17+
// "value > 10"
18+
// "sender in a,b,c"
19+
// "memo contains alert"
1920
func CompilePredicates(exprs []string) ([]Predicate, error) {
2021
var preds []Predicate
2122
for _, raw := range exprs {

internal/engine/runner.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ func (r *Runner) handleEvents(ctx context.Context, events []Event) error {
161161
if err != nil || !pass {
162162
continue
163163
}
164+
if r.dryRun {
165+
// No side effects in dry-run: skip dedupe and sends.
166+
continue
167+
}
164168
if exec.rule.Dedupe != nil {
165169
key := buildDedupeKey(exec.rule.Dedupe.Key, ev)
166170
now := r.nowFunc()
@@ -179,9 +183,6 @@ func (r *Runner) handleEvents(ctx context.Context, events []Event) error {
179183
return err
180184
}
181185
}
182-
if r.dryRun {
183-
continue
184-
}
185186
for _, sinkID := range exec.rule.Sinks {
186187
s := r.sinks[sinkID]
187188
if s == nil {
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package engine
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func TestBuildDedupeKey(t *testing.T) {
8+
idx := uint(5)
9+
ev := Event{
10+
TxHash: "0xabc",
11+
LogIndex: &idx,
12+
AppID: 42,
13+
}
14+
15+
key := buildDedupeKey("txhash:logIndex:app_id", ev)
16+
if key != "0xabc:5:42" {
17+
t.Fatalf("unexpected key: %s", key)
18+
}
19+
20+
key = buildDedupeKey("", ev)
21+
if key != "0xabc" {
22+
t.Fatalf("default key mismatch: %s", key)
23+
}
24+
}

internal/engine/runner_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package engine
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/devblac/watch-tower/internal/config"
9+
"github.com/devblac/watch-tower/internal/sink"
10+
"github.com/devblac/watch-tower/internal/storage"
11+
)
12+
13+
type fakeSink struct {
14+
count int
15+
}
16+
17+
func (f *fakeSink) Send(ctx context.Context, payload sink.EventPayload) error {
18+
f.count++
19+
return nil
20+
}
21+
22+
// Simple integration: ensure predicates + dedupe + dry-run behave.
23+
func TestRunnerPredicatesAndDryRun(t *testing.T) {
24+
store := newTestStore(t)
25+
rule := config.Rule{
26+
ID: "r1",
27+
Match: config.MatchSpec{Where: []string{"value > 10"}},
28+
Sinks: []string{"s1"},
29+
Dedupe: &config.Dedupe{
30+
Key: "txhash",
31+
TTL: "1h",
32+
},
33+
}
34+
cfg := &config.Config{Rules: []config.Rule{rule}}
35+
s := &fakeSink{}
36+
runner, err := NewRunner(store, cfg, nil, nil, map[string]sink.Sender{"s1": s}, true, 0, 0)
37+
if err != nil {
38+
t.Fatalf("runner: %v", err)
39+
}
40+
runner.nowFunc = func() time.Time { return time.Now() }
41+
42+
evs := []Event{{
43+
RuleID: "r1",
44+
TxHash: "0x1",
45+
Args: map[string]any{"value": 20},
46+
}}
47+
if err := runner.handleEvents(context.Background(), evs); err != nil {
48+
t.Fatalf("handle: %v", err)
49+
}
50+
if s.count != 0 { // dry-run should skip sends
51+
t.Fatalf("expected no sends in dry-run, got %d", s.count)
52+
}
53+
54+
// now run non-dry and ensure dedupe prevents duplicate
55+
runner.dryRun = false
56+
if err := runner.handleEvents(context.Background(), evs); err != nil {
57+
t.Fatalf("handle: %v", err)
58+
}
59+
if s.count != 1 {
60+
t.Fatalf("expected 1 send, got %d", s.count)
61+
}
62+
if err := runner.handleEvents(context.Background(), evs); err != nil {
63+
t.Fatalf("handle dup: %v", err)
64+
}
65+
if s.count != 1 {
66+
t.Fatalf("expected dedupe to skip duplicate send")
67+
}
68+
}
69+
70+
func newTestStore(t *testing.T) *storage.Store {
71+
t.Helper()
72+
store, err := storage.Open(t.TempDir() + "/db.sqlite")
73+
if err != nil {
74+
t.Fatalf("open store: %v", err)
75+
}
76+
t.Cleanup(func() { _ = store.Close() })
77+
return store
78+
}

internal/health/server.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package health
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"net/http"
7+
"time"
8+
)
9+
10+
type Checker struct {
11+
DBPing func(ctx context.Context) error
12+
RPCPing func(ctx context.Context) error
13+
}
14+
15+
// Serve starts a minimal /healthz handler.
16+
func Serve(addr string, checker Checker) *http.Server {
17+
mux := http.NewServeMux()
18+
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
19+
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
20+
defer cancel()
21+
22+
status := map[string]string{"status": "ok"}
23+
code := http.StatusOK
24+
25+
if checker.DBPing != nil {
26+
if err := checker.DBPing(ctx); err != nil {
27+
status["db"] = "fail"
28+
code = http.StatusServiceUnavailable
29+
} else {
30+
status["db"] = "ok"
31+
}
32+
}
33+
if checker.RPCPing != nil {
34+
if err := checker.RPCPing(ctx); err != nil {
35+
status["rpc"] = "fail"
36+
code = http.StatusServiceUnavailable
37+
} else {
38+
status["rpc"] = "ok"
39+
}
40+
}
41+
42+
w.Header().Set("Content-Type", "application/json")
43+
w.WriteHeader(code)
44+
_ = json.NewEncoder(w).Encode(status)
45+
})
46+
47+
srv := &http.Server{
48+
Addr: addr,
49+
Handler: mux,
50+
ReadHeaderTimeout: 3 * time.Second,
51+
}
52+
go func() { _ = srv.ListenAndServe() }()
53+
return srv
54+
}
55+

internal/logging/logging.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package logging
2+
3+
import (
4+
"log/slog"
5+
"os"
6+
"strings"
7+
)
8+
9+
// New returns a minimal structured logger with secret redaction.
10+
func New() *slog.Logger {
11+
handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
12+
Level: slog.LevelInfo,
13+
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
14+
if isSecretKey(a.Key) {
15+
a.Value = slog.StringValue("[redacted]")
16+
}
17+
return a
18+
},
19+
})
20+
return slog.New(handler)
21+
}
22+
23+
func isSecretKey(k string) bool {
24+
k = strings.ToLower(k)
25+
return strings.Contains(k, "token") || strings.Contains(k, "secret") || strings.Contains(k, "key") || strings.Contains(k, "pass")
26+
}
27+

tasks.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ sinks:
115115
- [x] Templates via `text/template`; helpers `pretty_json`, `short_addr`.
116116
- AC: Duplicate events do not resend (dedupe key honored); sink errors retried with backoff.
117117
- Phase H — Replay & Dry-Run
118-
- [ ] `--from/--to` historical scan; progress output; backpressure.
119-
- [ ] `--dry-run` writes alerts only; no network sends.
118+
- [x] `--from/--to` historical scan; progress output; backpressure.
119+
- [x] `--dry-run` writes alerts only; no network sends.
120120
- AC: 5k-block replay yields expected count; zero sink calls in dry-run; deterministic reruns.
121121
- Phase I — Ops (only needed)
122122
- [ ] `/healthz` HTTP (db, RPCs); optional `/metrics` Prometheus counters.

0 commit comments

Comments
 (0)