@@ -13,6 +13,7 @@ import (
1313
1414 "github.com/c9s/bbgo/pkg/bbgo"
1515 "github.com/c9s/bbgo/pkg/core"
16+ "github.com/c9s/bbgo/pkg/exchange/retry"
1617 "github.com/c9s/bbgo/pkg/fixedpoint"
1718 "github.com/c9s/bbgo/pkg/service"
1819 "github.com/c9s/bbgo/pkg/strategy/xmaker/pricer"
@@ -133,6 +134,8 @@ type HedgeMarket struct {
133134 market types.Market
134135 stream types.Stream
135136
137+ orderQueryService types.ExchangeOrderQueryService
138+
136139 connectivity * types.Connectivity
137140
138141 book * types.StreamOrderBook
@@ -210,6 +213,8 @@ func NewHedgeMarket(
210213 }
211214 logger := log .WithFields (logFields )
212215
216+ orderQueryService , _ := session .Exchange .(types.ExchangeOrderQueryService )
217+
213218 m := & HedgeMarket {
214219 HedgeMarketConfig : config ,
215220 session : session ,
@@ -218,6 +223,8 @@ func NewHedgeMarket(
218223 book : book ,
219224 depthBook : depthBook ,
220225
226+ orderQueryService : orderQueryService ,
227+
221228 connectivity : connectivity ,
222229
223230 positionExposure : NewPositionExposure (symbol ),
@@ -268,6 +275,46 @@ func (m *HedgeMarket) SetLogger(logger logrus.FieldLogger) {
268275 })
269276}
270277
278+ func (m * HedgeMarket ) syncOrder (ctx context.Context , order types.Order ) (* types.Order , error ) {
279+ updatedOrder , err := retry .QueryOrderUntilSuccessful (ctx , m .orderQueryService , order .AsQuery ())
280+ if err != nil {
281+ return nil , fmt .Errorf ("unable to query order #%d: %w" , order .OrderID , err )
282+ } else if updatedOrder != nil {
283+ if updatedOrder .Quantity .Compare (order .Quantity ) != 0 {
284+ m .logger .WithFields (order .LogFields ()).
285+ Warnf ("order quantity changed from %s to %s for order #%d" ,
286+ order .Quantity .String (), updatedOrder .Quantity .String (), order .OrderID )
287+ }
288+ }
289+
290+ orderTrades , err := retry .QueryOrderTradesUntilSuccessful (ctx , m .orderQueryService , order .AsQuery ())
291+ if err != nil {
292+ return updatedOrder , fmt .Errorf ("unable to query order #%d trades: %w" , order .OrderID , err )
293+ }
294+
295+ for _ , trade := range orderTrades {
296+ m .tradeCollector .ProcessTrade (trade )
297+ }
298+
299+ return updatedOrder , nil
300+ }
301+
302+ func (m * HedgeMarket ) syncHistoryOrder (ctx context.Context ) {
303+ historyOrders := m .orderStore .Orders ()
304+ m .logger .Infof ("loaded %d historical orders" , len (historyOrders ))
305+
306+ for _ , order := range historyOrders {
307+ updatedOrder , err := m .syncOrder (ctx , order )
308+ if err != nil {
309+ m .logger .WithError (err ).WithFields (order .LogFields ()).Warnf ("failed to sync order" )
310+ } else if updatedOrder != nil {
311+ m .orderStore .Update (* updatedOrder )
312+ }
313+ }
314+
315+ m .orderStore .Prune (8 * time .Hour )
316+ }
317+
271318// SetPriceFeeMode sets the fee mode used when computing hedge quote prices.
272319// This method is safe to call at runtime.
273320func (m * HedgeMarket ) SetPriceFeeMode (mode FeeMode ) {
@@ -646,6 +693,17 @@ func (m *HedgeMarket) calculateDebtQuota(totalValue, debtValue, minMarginLevel,
646693 return debtQuota
647694}
648695
696+ // Send sends the position delta to the hedge market.
697+ func (m * HedgeMarket ) Send (delta fixedpoint.Value ) {
698+ select {
699+ case m .positionDeltaC <- delta :
700+ case <- time .After (30 * time .Second ):
701+ m .logger .Warnf ("position delta channel is full, dropping delta: %f" , delta .Float64 ())
702+ }
703+ }
704+
705+ // hedge executes the hedge logic to adjust the position exposure to zero.
706+ // you should pass the position delta to the positionDeltaC channel
649707func (m * HedgeMarket ) hedge (ctx context.Context ) error {
650708 if err := m .hedgeExecutor .Clear (ctx ); err != nil {
651709 return fmt .Errorf ("failed to clear hedge executor: %w" , err )
0 commit comments