11package main
22
33import (
4+ "context"
45 "fmt"
6+ "net/http"
7+ "os"
58 "time"
69
710 "github.com/devblac/watch-tower/internal/config"
811 "github.com/devblac/watch-tower/internal/engine"
12+ "github.com/devblac/watch-tower/internal/health"
13+ "github.com/devblac/watch-tower/internal/logging"
14+ "github.com/devblac/watch-tower/internal/metrics"
915 "github.com/devblac/watch-tower/internal/sink"
1016 "github.com/devblac/watch-tower/internal/source/algorand"
1117 "github.com/devblac/watch-tower/internal/source/evm"
1218 "github.com/devblac/watch-tower/internal/storage"
13- "github.com/devblac/watch-tower/internal/logging"
1419 "github.com/spf13/cobra"
1520)
1621
1722var (
18- flagOnce bool
19- flagDryRun bool
20- flagFrom uint64
21- flagTo uint64
23+ flagOnce bool
24+ flagDryRun bool
25+ flagFrom uint64
26+ flagTo uint64
27+ flagHealth string
28+ flagMetrics string
2229)
2330
2431func init () {
2532 runCmd .Flags ().BoolVar (& flagOnce , "once" , false , "Process one tick and exit" )
2633 runCmd .Flags ().BoolVar (& flagDryRun , "dry-run" , false , "Do not send to sinks" )
2734 runCmd .Flags ().Uint64Var (& flagFrom , "from" , 0 , "Start from height/round override" )
2835 runCmd .Flags ().Uint64Var (& flagTo , "to" , 0 , "Stop at height/round (inclusive)" )
36+ runCmd .Flags ().StringVar (& flagHealth , "health" , "" , "Health check HTTP address (e.g., :8080)" )
37+ runCmd .Flags ().StringVar (& flagMetrics , "metrics" , "" , "Metrics HTTP address (e.g., :9090)" )
2938}
3039
3140var runCmd = & cobra.Command {
3241 Use : "run" ,
3342 Short : "Run watch-tower pipelines" ,
3443 RunE : func (cmd * cobra.Command , args []string ) error {
35- log := logging .New ()
44+ logLevel := os .Getenv ("LOG_LEVEL" )
45+ if logLevel == "" {
46+ logLevel = "info"
47+ }
48+ log := logging .NewWithLevel (logLevel )
3649 ctx := cmd .Context ()
3750
3851 cfg , err := config .Load (cfgPath )
@@ -46,8 +59,11 @@ var runCmd = &cobra.Command{
4659 }
4760 defer store .Close ()
4861
62+ evmClients := map [string ]evm.BlockClient {}
63+ algoClients := map [string ]algorand.AlgodClient {}
4964 evmScanners := map [string ]* evm.Scanner {}
5065 algoScanners := map [string ]* algorand.Scanner {}
66+
5167 for _ , src := range cfg .Sources {
5268 switch src .Type {
5369 case "evm" :
@@ -58,6 +74,7 @@ var runCmd = &cobra.Command{
5874 if err != nil {
5975 return err
6076 }
77+ evmClients [src .ID ] = cli
6178 abis , _ := evm .LoadABIs (src .ABIDirs )
6279 confirmations := cfg .Global .Confirmations ["evm" ]
6380 sc , err := evm .NewScanner (cli , store , src , confirmations , abis , cfg .Rules )
@@ -73,6 +90,7 @@ var runCmd = &cobra.Command{
7390 if err != nil {
7491 return err
7592 }
93+ algoClients [src .ID ] = cli
7694 confirmations := cfg .Global .Confirmations ["algorand" ]
7795 sc , err := algorand .NewScanner (cli , store , src , confirmations , cfg .Rules )
7896 if err != nil {
@@ -108,15 +126,53 @@ var runCmd = &cobra.Command{
108126 }
109127 }
110128
129+ var mtr * metrics.Metrics
130+ if flagMetrics != "" {
131+ mtr = metrics .Init ()
132+ log .Info ("metrics enabled" , "addr" , flagMetrics )
133+ }
134+
135+ if flagHealth != "" {
136+ rpcChecker := health .NewRPCChecker (evmClients , algoClients )
137+ healthSrv := health .Serve (flagHealth , health.Checker {
138+ DBPing : store .Ping ,
139+ RPCPing : rpcChecker .Ping ,
140+ })
141+ log .Info ("health check enabled" , "addr" , flagHealth )
142+ defer func () {
143+ shutdownCtx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
144+ defer cancel ()
145+ _ = health .Shutdown (shutdownCtx , healthSrv )
146+ }()
147+ }
148+
149+ if flagMetrics != "" {
150+ go func () {
151+ mux := http .NewServeMux ()
152+ mux .Handle ("/metrics" , metrics .Handler ())
153+ srv := & http.Server {Addr : flagMetrics , Handler : mux }
154+ if err := srv .ListenAndServe (); err != nil && err != http .ErrServerClosed {
155+ log .Error ("metrics server error" , "error" , err )
156+ }
157+ }()
158+ }
159+
111160 runner , err := engine .NewRunner (store , cfg , evmScanners , algoScanners , sinks , flagDryRun , flagFrom , flagTo )
112161 if err != nil {
113162 return err
114163 }
115164
116165 for {
117166 if err := runner .RunOnce (ctx ); err != nil {
167+ if mtr != nil {
168+ mtr .Errors ()
169+ }
170+ log .Error ("run error" , "error" , err )
118171 return err
119172 }
173+ if mtr != nil {
174+ mtr .BlocksProcessed ()
175+ }
120176 log .Info ("tick complete" , "dry_run" , flagDryRun )
121177 if flagOnce {
122178 break
0 commit comments