@@ -15,6 +15,7 @@ export class PythPriceListener implements IPriceListener {
1515 private latestPriceInfo : Map < HexString , PriceInfo > ;
1616 private logger : Logger ;
1717 private lastUpdated : TimestampInMs | undefined ;
18+ private healthCheckInterval ?: NodeJS . Timeout ;
1819
1920 constructor (
2021 connection : PriceServiceConnection ,
@@ -33,26 +34,111 @@ export class PythPriceListener implements IPriceListener {
3334 // This method should be awaited on and once it finishes it has the latest value
3435 // for the given price feeds (if they exist).
3536 async start ( ) {
37+ // Set custom error handler for websocket errors
38+ this . connection . onWsError = ( error : Error ) => {
39+ if ( error . message . includes ( "not found" ) ) {
40+ // Extract invalid feed IDs from error message
41+ const match = error . message . match ( / \[ ( .* ?) \] / ) ;
42+ if ( match ) {
43+ const invalidFeedIds = match [ 1 ] . split ( "," ) . map ( ( id ) => {
44+ // Remove '0x' prefix if present to match our stored IDs
45+ return id . trim ( ) . replace ( / ^ 0 x / , "" ) ;
46+ } ) ;
47+
48+ // Log invalid feeds with their aliases
49+ invalidFeedIds . forEach ( ( id ) => {
50+ this . logger . error (
51+ `Price feed ${ id } (${ this . priceIdToAlias . get (
52+ id
53+ ) } ) not found for subscribePriceFeedUpdates`
54+ ) ;
55+ } ) ;
56+
57+ // Filter out invalid feeds and resubscribe with valid ones
58+ const validFeeds = this . priceIds . filter (
59+ ( id ) => ! invalidFeedIds . includes ( id )
60+ ) ;
61+
62+ this . priceIds = validFeeds ;
63+
64+ if ( validFeeds . length > 0 ) {
65+ this . logger . info ( "Resubscribing with valid feeds only" ) ;
66+ this . connection . subscribePriceFeedUpdates (
67+ validFeeds ,
68+ this . onNewPriceFeed . bind ( this )
69+ ) ;
70+ }
71+ }
72+ } else {
73+ this . logger . error ( "Websocket error occurred:" , error ) ;
74+ }
75+ } ;
76+
3677 this . connection . subscribePriceFeedUpdates (
3778 this . priceIds ,
3879 this . onNewPriceFeed . bind ( this )
3980 ) ;
4081
41- const priceFeeds = await this . connection . getLatestPriceFeeds ( this . priceIds ) ;
42- priceFeeds ?. forEach ( ( priceFeed ) => {
43- // Getting unchecked because although it might be old
44- // but might not be there on the target chain.
45- const latestAvailablePrice = priceFeed . getPriceUnchecked ( ) ;
46- this . latestPriceInfo . set ( priceFeed . id , {
47- price : latestAvailablePrice . price ,
48- conf : latestAvailablePrice . conf ,
49- publishTime : latestAvailablePrice . publishTime ,
82+ try {
83+ const priceFeeds = await this . connection . getLatestPriceFeeds (
84+ this . priceIds
85+ ) ;
86+ priceFeeds ?. forEach ( ( priceFeed ) => {
87+ const latestAvailablePrice = priceFeed . getPriceUnchecked ( ) ;
88+ this . latestPriceInfo . set ( priceFeed . id , {
89+ price : latestAvailablePrice . price ,
90+ conf : latestAvailablePrice . conf ,
91+ publishTime : latestAvailablePrice . publishTime ,
92+ } ) ;
5093 } ) ;
51- } ) ;
94+ } catch ( error : any ) {
95+ // Always log the HTTP error first
96+ this . logger . error ( "Failed to get latest price feeds:" , error ) ;
97+
98+ if ( error . response . data . includes ( "Price ids not found:" ) ) {
99+ // Extract invalid feed IDs from error message
100+ const invalidFeedIds = error . response . data
101+ . split ( "Price ids not found:" ) [ 1 ]
102+ . split ( "," )
103+ . map ( ( id : string ) => id . trim ( ) . replace ( / ^ 0 x / , "" ) ) ;
104+
105+ // Log invalid feeds with their aliases
106+ invalidFeedIds . forEach ( ( id : string ) => {
107+ this . logger . error (
108+ `Price feed ${ id } (${ this . priceIdToAlias . get (
109+ id
110+ ) } ) not found for getLatestPriceFeeds`
111+ ) ;
112+ } ) ;
52113
53- // Check health of the price feeds 5 second. If the price feeds are not updating
54- // for more than 30s, throw an error.
55- setInterval ( ( ) => {
114+ // Filter out invalid feeds and retry
115+ const validFeeds = this . priceIds . filter (
116+ ( id ) => ! invalidFeedIds . includes ( id )
117+ ) ;
118+
119+ this . priceIds = validFeeds ;
120+
121+ if ( validFeeds . length > 0 ) {
122+ this . logger . info (
123+ "Retrying getLatestPriceFeeds with valid feeds only"
124+ ) ;
125+ const validPriceFeeds = await this . connection . getLatestPriceFeeds (
126+ validFeeds
127+ ) ;
128+ validPriceFeeds ?. forEach ( ( priceFeed ) => {
129+ const latestAvailablePrice = priceFeed . getPriceUnchecked ( ) ;
130+ this . latestPriceInfo . set ( priceFeed . id , {
131+ price : latestAvailablePrice . price ,
132+ conf : latestAvailablePrice . conf ,
133+ publishTime : latestAvailablePrice . publishTime ,
134+ } ) ;
135+ } ) ;
136+ }
137+ }
138+ }
139+
140+ // Store health check interval reference
141+ this . healthCheckInterval = setInterval ( ( ) => {
56142 if (
57143 this . lastUpdated === undefined ||
58144 this . lastUpdated < Date . now ( ) - 30 * 1000
@@ -88,4 +174,10 @@ export class PythPriceListener implements IPriceListener {
88174 getLatestPriceInfo ( priceId : string ) : PriceInfo | undefined {
89175 return this . latestPriceInfo . get ( priceId ) ;
90176 }
177+
178+ cleanup ( ) {
179+ if ( this . healthCheckInterval ) {
180+ clearInterval ( this . healthCheckInterval ) ;
181+ }
182+ }
91183}
0 commit comments