44import dataclasses
55import logging
66import queue
7+ import struct
78import threading
89from collections .abc import Iterable
910from typing import IO
1011from typing import Callable
1112from typing import Final
1213
1314import databento_dbn
14- from databento_dbn import Metadata
1515from databento_dbn import Schema
1616from databento_dbn import SType
1717
@@ -188,41 +188,45 @@ def __init__(
188188 ts_out : bool = False ,
189189 heartbeat_interval_s : int | None = None ,
190190 ):
191- super ().__init__ (api_key , dataset , ts_out )
191+ super ().__init__ (api_key , dataset , ts_out , heartbeat_interval_s )
192192
193193 self ._dbn_queue = dbn_queue
194194 self ._loop = loop
195195 self ._metadata : SessionMetadata = metadata
196196 self ._user_callbacks = user_callbacks
197197 self ._user_streams = user_streams
198198
199- def _process_dbn (self , data : bytes ) -> None :
200- start_index = 0
201- if data .startswith (b"DBN" ) and self ._metadata :
202- # We have already received metata for the stream
203- # Set start index to metadata length
204- start_index = int .from_bytes (data [4 :8 ], byteorder = "little" ) + 8
205- self ._metadata .check (Metadata .decode (bytes (data [:start_index ])))
206- for stream , exc_callback in self ._user_streams .items ():
207- try :
208- stream .write (data [start_index :])
209- except Exception as exc :
210- stream_name = getattr (stream , "name" , str (stream ))
211- logger .error (
212- "error writing %d bytes to `%s` stream" ,
213- len (data [start_index :]),
214- stream_name ,
215- exc_info = exc ,
216- )
217- if exc_callback is not None :
218- exc_callback (exc )
219- return super ()._process_dbn (data )
220-
221199 def received_metadata (self , metadata : databento_dbn .Metadata ) -> None :
222- self ._metadata .data = metadata
200+ if self ._metadata :
201+ self ._metadata .check (metadata )
202+ else :
203+ metadata_bytes = metadata .encode ()
204+ for stream , exc_callback in self ._user_streams .items ():
205+ try :
206+ stream .write (metadata_bytes )
207+ except Exception as exc :
208+ stream_name = getattr (stream , "name" , str (stream ))
209+ logger .error (
210+ "error writing %d bytes to `%s` stream" ,
211+ len (metadata_bytes ),
212+ stream_name ,
213+ exc_info = exc ,
214+ )
215+ if exc_callback is not None :
216+ exc_callback (exc )
217+
218+ self ._metadata .data = metadata
223219 return super ().received_metadata (metadata )
224220
225221 def received_record (self , record : DBNRecord ) -> None :
222+ self ._dispatch_writes (record )
223+ self ._dispatch_callbacks (record )
224+ if self ._dbn_queue .is_enabled ():
225+ self ._queue_for_iteration (record )
226+
227+ return super ().received_record (record )
228+
229+ def _dispatch_callbacks (self , record : DBNRecord ) -> None :
226230 for callback , exc_callback in self ._user_callbacks .items ():
227231 try :
228232 callback (record )
@@ -236,18 +240,37 @@ def received_record(self, record: DBNRecord) -> None:
236240 if exc_callback is not None :
237241 exc_callback (exc )
238242
239- if self ._dbn_queue .is_enabled ():
240- self ._dbn_queue .put (record )
243+ def _dispatch_writes (self , record : DBNRecord ) -> None :
244+ if hasattr (record , "ts_out" ):
245+ ts_out_bytes = struct .pack ("Q" , record .ts_out )
246+ else :
247+ ts_out_bytes = b""
241248
242- # DBNQueue has no max size; so check if it's above capacity, and if so, pause reading
243- if self ._dbn_queue .is_full ():
244- logger .warning (
245- "record queue is full; %d record(s) to be processed" ,
246- self ._dbn_queue .qsize (),
249+ record_bytes = bytes (record ) + ts_out_bytes
250+
251+ for stream , exc_callback in self ._user_streams .items ():
252+ try :
253+ stream .write (record_bytes )
254+ except Exception as exc :
255+ stream_name = getattr (stream , "name" , str (stream ))
256+ logger .error (
257+ "error writing %d bytes to `%s` stream" ,
258+ len (record_bytes ),
259+ stream_name ,
260+ exc_info = exc ,
247261 )
248- self .transport .pause_reading ()
262+ if exc_callback is not None :
263+ exc_callback (exc )
249264
250- return super ().received_record (record )
265+ def _queue_for_iteration (self , record : DBNRecord ) -> None :
266+ self ._dbn_queue .put (record )
267+ # DBNQueue has no max size; so check if it's above capacity, and if so, pause reading
268+ if self ._dbn_queue .is_full ():
269+ logger .warning (
270+ "record queue is full; %d record(s) to be processed" ,
271+ self ._dbn_queue .qsize (),
272+ )
273+ self .transport .pause_reading ()
251274
252275
253276class Session :
0 commit comments