From 1120a475170d7bdb1b9a58267967418158b9e475 Mon Sep 17 00:00:00 2001 From: enoreese Date: Mon, 19 May 2025 01:49:49 +0100 Subject: [PATCH 1/9] get overview and fundamental data --- tvDatafeed/main.py | 126 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 104 insertions(+), 22 deletions(-) diff --git a/tvDatafeed/main.py b/tvDatafeed/main.py index de77ebc..b28a125 100644 --- a/tvDatafeed/main.py +++ b/tvDatafeed/main.py @@ -4,6 +4,7 @@ import logging import random import re +import regex import string import pandas as pd from websocket import create_connection @@ -37,9 +38,9 @@ class TvDatafeed: __ws_timeout = 5 def __init__( - self, - username: str = None, - password: str = None, + self, + username: str = None, + password: str = None, ) -> None: """Create TvDatafeed object @@ -87,6 +88,12 @@ def __create_connection(self): "wss://data.tradingview.com/socket.io/websocket", headers=self.__ws_headers, timeout=self.__ws_timeout ) + def __create_custom_connection(self, url): + logging.debug("creating websocket connection") + self.ws = create_connection( + url, headers=self.__ws_headers, timeout=self.__ws_timeout + ) + @staticmethod def __filter_raw_message(text): try: @@ -133,6 +140,7 @@ def __send_message(self, func, args): @staticmethod def __create_df(raw_data, symbol): try: + print(raw_data) out = re.search('"s":\[(.+?)\}\]', raw_data).group(1) x = out.split(',{"') data = list() @@ -169,6 +177,27 @@ def __create_df(raw_data, symbol): except AttributeError: logger.error("no data, please check the exchange and symbol") + def __create_overview_result(self, raw_data, symbol): + try: + raw_data = re.sub(r"~m~\d+~m~", ",", raw_data) + raw_data = f"[{raw_data[1:]}]" + matches = json.loads(raw_data) + out = {} + for group in matches: + p = group.get("p", []) + + if not p: + continue + + if isinstance(p[-1], dict): + out.update(p[-1]["v"]) + + print(f"out: {out}") + return out + except AttributeError as e: + print(e) + logger.error("no data, please check the exchange and symbol") + @staticmethod def __format_symbol(symbol, exchange, contract: int = None): @@ -186,13 +215,13 @@ def __format_symbol(symbol, exchange, contract: int = None): return symbol def get_hist( - self, - symbol: str, - exchange: str = "NSE", - interval: Interval = Interval.in_daily, - n_bars: int = 10, - fut_contract: int = None, - extended_session: bool = False, + self, + symbol: str, + exchange: str = "NSE", + interval: Interval = Interval.in_daily, + n_bars: int = 10, + fut_contract: int = None, + extended_session: bool = False, ) -> pd.DataFrame: """get historical data @@ -271,7 +300,7 @@ def get_hist( [self.chart_session, "s1", "s1", "symbol_1", interval, n_bars], ) self.__send_message("switch_timezone", [ - self.chart_session, "exchange"]) + self.chart_session, "exchange"]) raw_data = "" @@ -289,6 +318,55 @@ def get_hist( return self.__create_df(raw_data, symbol) + def get_overview( + self, + symbol: str, + exchange: str = "NSE", + fut_contract: int = None, + ) -> dict[str, object]: + """get historical data + + Args: + symbol (str): symbol name + exchange (str, optional): exchange, not required if symbol is in format EXCHANGE:SYMBOL. Defaults to None. + fut_contract (int, optional): None for cash, 1 for continuous current contract in front, 2 for continuous next contract in front . Defaults to None. + + Returns: + pd.Dataframe: dataframe with sohlcv as columns + """ + symbol = self.__format_symbol( + symbol=symbol, exchange=exchange, contract=fut_contract + ) + + date_str = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M") + url = f"wss://data.tradingview.com/socket.io/websocket?from=symbols%2F{symbol.replace(':', '-')}%2Ffinancials-revenue%2F&date={date_str}" + self.__create_custom_connection(url) + + self.__send_message("set_auth_token", [self.token]) + self.__send_message("quote_create_session", [self.session]) + + self.__send_message( + "quote_add_symbols", [self.session, symbol] + ) + self.__send_message("quote_fast_symbols", [self.session, symbol]) + + raw_data = "" + + logger.debug(f"getting data for {symbol}...") + while True: + try: + result = self.ws.recv() + raw_data = raw_data + result + "\n" + except Exception as e: + logger.error(e) + break + + if "series_completed" in result: + break + + # print(f"Raw: {raw_data}") + return self.__create_overview_result(raw_data, symbol) + def search_symbol(self, text: str, exchange: str = ''): url = self.__search_url.format(text, exchange) @@ -307,14 +385,18 @@ def search_symbol(self, text: str, exchange: str = ''): if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) tv = TvDatafeed() - print(tv.get_hist("CRUDEOIL", "MCX", fut_contract=1)) - print(tv.get_hist("NIFTY", "NSE", fut_contract=1)) - print( - tv.get_hist( - "EICHERMOT", - "NSE", - interval=Interval.in_1_hour, - n_bars=500, - extended_session=False, - ) - ) + print(tv.get_hist("KCB", "NSEKE")) + print(tv.get_overview( + "KCB", + "NSEKE", + )) + # print(tv.get_hist("NIFTY", "NSE", fut_contract=1)) + # print( + # tv.get_hist( + # "EICHERMOT", + # "NSE", + # interval=Interval.in_1_hour, + # n_bars=500, + # extended_session=False, + # ) + # ) From cde6b27c492680d2a4236fe7b0992542a2f4a58b Mon Sep 17 00:00:00 2001 From: enoreese Date: Fri, 6 Jun 2025 02:06:31 +0100 Subject: [PATCH 2/9] add bulk feature --- tvDatafeed/main.py | 215 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 207 insertions(+), 8 deletions(-) diff --git a/tvDatafeed/main.py b/tvDatafeed/main.py index b28a125..de687c5 100644 --- a/tvDatafeed/main.py +++ b/tvDatafeed/main.py @@ -8,6 +8,7 @@ import string import pandas as pd from websocket import create_connection +from typing import Union, List, Dict import requests import json @@ -138,10 +139,9 @@ def __send_message(self, func, args): self.ws.send(m) @staticmethod - def __create_df(raw_data, symbol): + async def __create_df(raw_data, symbol): try: - print(raw_data) - out = re.search('"s":\[(.+?)\}\]', raw_data).group(1) + out = re.search('"s":\[\{(.+?)\}\]', raw_data).group(1) x = out.split(',{"') data = list() volume_data = True @@ -192,7 +192,20 @@ def __create_overview_result(self, raw_data, symbol): if isinstance(p[-1], dict): out.update(p[-1]["v"]) - print(f"out: {out}") + return out + except AttributeError as e: + print(e) + logger.error("no data, please check the exchange and symbol") + + def __create_overview_result_update(self, raw_data, symbol): + try: + raw_data = re.findall('"v":\{(.+?)\}~m', raw_data) + matches = [json.loads("{" + out[:-3] + "}") for out in raw_data] + out = [match for match in matches if "business_description" in match][0] + if not out: + logger.error("no data, please check the exchange and symbol") + return None + return out except AttributeError as e: print(e) @@ -318,6 +331,123 @@ def get_hist( return self.__create_df(raw_data, symbol) + async def get_hist_batch( + self, + symbols: List[str], + exchanges: List[str], + interval: Interval = Interval.in_daily, + n_bars: int = 10, + fut_contract: int = None, + extended_session: bool = False, + ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: + """get historical data + + Args: + symbols (str or list): symbol name or list of symbol names + exchanges (str or list, optional): exchange or list of exchanges. Defaults to "NSE". + interval (str, optional): chart interval. Defaults to 'D'. + n_bars (int, optional): no of bars to download, max 5000. Defaults to 10. + fut_contract (int, optional): None for cash, 1 for continuous current contract in front, + 2 for continuous next contract in front. Defaults to None. + extended_session (bool, optional): regular session if False, extended session if True. + Defaults to False. + + Returns: + Union[pd.DataFrame, Dict[str, pd.DataFrame]]: + - Single DataFrame if single symbol is provided + - Dictionary of {symbol: DataFrame} if multiple symbols are provided + """ + symbols = [ + self.__format_symbol( + symbol=symbol, exchange=exchange, contract=fut_contract + ) for symbol, exchange in zip(symbols, exchanges) + ] + symbols_string = ",".join(symbols) + + interval = interval.value + + self.__create_connection() + + self.__send_message("set_auth_token", [self.token]) + self.__send_message("chart_create_session", [self.chart_session, ""]) + self.__send_message("quote_create_session", [self.session]) + self.__send_message( + "quote_set_fields", + [ + self.session, + "ch", + "chp", + "current_session", + "description", + "local_description", + "language", + "exchange", + "fractional", + "is_tradable", + "lp", + "lp_time", + "minmov", + "minmove2", + "original_name", + "pricescale", + "pro_name", + "short_name", + "type", + "update_mode", + "volume", + "currency_code", + "rchp", + "rtc", + ], + ) + + symbol_dfs = {} + for i, symbol in enumerate(symbols): + self.__send_message( + "resolve_symbol", + [ + self.chart_session, + f"symbol_{i + 1}", + '={"symbol":"' + + symbol + + '","adjustment":"splits","session":' + + ('"regular"' if not extended_session else '"extended"') + + "}", + ], + ) + self.__send_message( + "create_series", + [self.chart_session, f"s{i + 1}", f"s{i + 1}", f"symbol_{i + 1}", interval, n_bars], + ) + + symbol_dfs[symbol] = await self.receive_create_df(symbol) + + self.__send_message( + "remove_series", + [self.chart_session, f"s{i + 1}"], + ) + + self.__send_message("switch_timezone", [ + self.chart_session, "exchange"]) + + return symbol_dfs + + async def receive_create_df(self, symbol): + raw_data = "" + logger.debug(f"getting data for {symbol}...") + while True: + try: + result = self.ws.recv() + raw_data = raw_data + result + "\n" + except Exception as e: + logger.error(e) + break + + if "series_completed" in result: + break + print(raw_data) + return await self.__create_df(raw_data, symbol) + def get_overview( self, symbol: str, @@ -367,6 +497,65 @@ def get_overview( # print(f"Raw: {raw_data}") return self.__create_overview_result(raw_data, symbol) + def get_overview_batch( + self, + symbols: List[str], + exchanges: List[str], + fut_contract: int = None, + ) -> dict[str, object]: + """get historical data + + Args: + symbols (str): symbol name + exchanges (str, optional): exchange, not required if symbol is in format EXCHANGE:SYMBOL. Defaults to None. + fut_contract (int, optional): None for cash, 1 for continuous current contract in front, 2 for continuous next contract in front . Defaults to None. + + Returns: + pd.Dataframe: dataframe with sohlcv as columns + """ + symbols = [ + self.__format_symbol( + symbol=symbol, exchange=exchange, contract=fut_contract + ) for symbol, exchange in zip(symbols, exchanges) + ] + symbols_string = ",".join(symbols) + symbols_string_encoded = "%2F".join(symbols) + + date_str = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M") + url = f"wss://data.tradingview.com/socket.io/websocket?from=symbols%2F{symbols_string_encoded.replace(':', '-')}%2Ffinancials-revenue%2F&date={date_str}" + print(url) + self.__create_custom_connection(url) + + self.__send_message("set_auth_token", [self.token]) + self.__send_message("quote_create_session", [self.session]) + + symbol_dict = {} + for i, symbol in enumerate(symbols): + self.__send_message( + "quote_add_symbols", [self.session, symbol] + ) + self.__send_message("quote_fast_symbols", [self.session, symbol]) + + raw_data = "" + + logger.debug(f"getting data for {symbol}...") + while True: + try: + result = self.ws.recv() + raw_data = raw_data + result + "\n" + except Exception as e: + logger.error(e) + break + + if "series_completed" in result: + break + + # print(f"Raw-{symbol}: {raw_data}") + + symbol_dict[symbols[i]] = self.__create_overview_result_update(raw_data, symbol) + + return symbol_dict + def search_symbol(self, text: str, exchange: str = ''): url = self.__search_url.format(text, exchange) @@ -383,12 +572,22 @@ def search_symbol(self, text: str, exchange: str = ''): if __name__ == "__main__": + import asyncio + logging.basicConfig(level=logging.DEBUG) tv = TvDatafeed() - print(tv.get_hist("KCB", "NSEKE")) - print(tv.get_overview( - "KCB", - "NSEKE", + # print(tv.get_hist("KCB", "NSEKE")) + # print(tv.get_overview( + # "KCB", + # "NSEKE", + # )) + # print(asyncio.run(tv.get_hist_batch( + # symbols=["MTNN", "GTCO"], + # exchanges=["NSENG", "NSENG"] + # ))) + print(tv.get_overview_batch( + symbols=["MTNN", "GTCO"], + exchanges=["NSENG", "NSENG"] )) # print(tv.get_hist("NIFTY", "NSE", fut_contract=1)) # print( From 1cb0c7540a98426625c45aa8871e3dc87623f4b3 Mon Sep 17 00:00:00 2001 From: enoreese Date: Sat, 7 Jun 2025 13:56:00 +0100 Subject: [PATCH 3/9] add stream feature --- tvDatafeed/main.py | 259 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 246 insertions(+), 13 deletions(-) diff --git a/tvDatafeed/main.py b/tvDatafeed/main.py index de687c5..39e1cbb 100644 --- a/tvDatafeed/main.py +++ b/tvDatafeed/main.py @@ -1,4 +1,5 @@ import datetime +import asyncio import enum import json import logging @@ -7,8 +8,10 @@ import regex import string import pandas as pd +from tqdm import tqdm from websocket import create_connection -from typing import Union, List, Dict +from websocket import WebSocketException, WebSocketConnectionClosedException +from typing import Union, List, Dict, Any import requests import json @@ -89,6 +92,10 @@ def __create_connection(self): "wss://data.tradingview.com/socket.io/websocket", headers=self.__ws_headers, timeout=self.__ws_timeout ) + def create_connection(self): + self.__create_connection() + return self.ws + def __create_custom_connection(self, url): logging.debug("creating websocket connection") self.ws = create_connection( @@ -139,7 +146,7 @@ def __send_message(self, func, args): self.ws.send(m) @staticmethod - async def __create_df(raw_data, symbol): + async def __create_df(raw_data, symbol, add_s_field=False): try: out = re.search('"s":\[\{(.+?)\}\]', raw_data).group(1) x = out.split(',{"') @@ -148,7 +155,7 @@ async def __create_df(raw_data, symbol): for xi in x: xi = re.split("\[|:|,|\]", xi) - ts = datetime.datetime.fromtimestamp(float(xi[4])) + ts = datetime.datetime.fromtimestamp(float(xi[4])).isoformat() row = [ts] @@ -171,8 +178,14 @@ async def __create_df(raw_data, symbol): data = pd.DataFrame( data, columns=["datetime", "open", "high", "low", "close", "volume"] - ).set_index("datetime") + ) data.insert(0, "symbol", value=symbol) + + if add_s_field: + s_field = re.findall(r'"\s*(s\d+)"\s*:', raw_data) + s_field = s_field[0] + data["s_field"] = s_field + return data except AttributeError: logger.error("no data, please check the exchange and symbol") @@ -197,11 +210,15 @@ def __create_overview_result(self, raw_data, symbol): print(e) logger.error("no data, please check the exchange and symbol") - def __create_overview_result_update(self, raw_data, symbol): + def __create_overview_result_update(self, raw_data, single_output=True): try: raw_data = re.findall('"v":\{(.+?)\}~m', raw_data) matches = [json.loads("{" + out[:-3] + "}") for out in raw_data] - out = [match for match in matches if "business_description" in match][0] + if single_output: + out = [match for match in matches if "business_description" in match][0] + else: + out = [match for match in matches if "business_description" in match] + if not out: logger.error("no data, please check the exchange and symbol") return None @@ -339,6 +356,7 @@ async def get_hist_batch( n_bars: int = 10, fut_contract: int = None, extended_session: bool = False, + session=None ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: """get historical data @@ -366,7 +384,10 @@ async def get_hist_batch( interval = interval.value - self.__create_connection() + if not session: + self.__create_connection() + else: + self.ws = session self.__send_message("set_auth_token", [self.token]) self.__send_message("chart_create_session", [self.chart_session, ""]) @@ -432,6 +453,136 @@ async def get_hist_batch( return symbol_dfs + def is_ws_connected(self): + try: + self.ws.recv() # Attempt to receive data, which updates ws.connected + return True + except WebSocketException: + return False + + async def get_hist_stream( + self, + symbols: List[str], + exchanges: List[str], + interval: Interval = Interval.in_daily, + n_bars: int = 10, + fut_contract: int = None, + extended_session: bool = False, + on_message: Any = None, + params: dict = None + ): + """get historical data + + Args: + symbols (str or list): symbol name or list of symbol names + exchanges (str or list, optional): exchange or list of exchanges. Defaults to "NSE". + interval (str, optional): chart interval. Defaults to 'D'. + n_bars (int, optional): no of bars to download, max 5000. Defaults to 10. + fut_contract (int, optional): None for cash, 1 for continuous current contract in front, + 2 for continuous next contract in front. Defaults to None. + extended_session (bool, optional): regular session if False, extended session if True. + Defaults to False. + + Returns: + Union[pd.DataFrame, Dict[str, pd.DataFrame]]: + - Single DataFrame if single symbol is provided + - Dictionary of {symbol: DataFrame} if multiple symbols are provided + """ + symbols = [ + self.__format_symbol( + symbol=symbol, exchange=exchange, contract=fut_contract + ) for symbol, exchange in zip(symbols, exchanges) + ] + + interval = interval.value + + self.__create_connection() + + self.__send_message("set_auth_token", [self.token]) + + raw_data = "" + symbol_dfs = {} + + logger.debug(f"getting data for {len(symbols)} symbols...") + while True: + # try and check if self.ws is not none and is still open + if not self.is_ws_connected(): + self.__create_connection() + + for i, symbol in tqdm(enumerate(symbols)): + chart_session = self.__generate_chart_session() + session = self.__generate_session() + + self.__send_message("chart_create_session", [chart_session, ""]) + self.__send_message("quote_create_session", [session]) + self.__send_message( + "quote_set_fields", + [ + session, + "ch", + "chp", + "current_session", + "description", + "local_description", + "language", + "exchange", + "fractional", + "is_tradable", + "lp", + "lp_time", + "minmov", + "minmove2", + "original_name", + "pricescale", + "pro_name", + "short_name", + "type", + "update_mode", + "volume", + "currency_code", + "rchp", + "rtc", + ], + ) + self.__send_message( + "resolve_symbol", + [ + chart_session, + f"symbol_{i + 1}", + '={"symbol":"' + + symbol + + '","adjustment":"splits","session":' + + ('"regular"' if not extended_session else '"extended"') + + "}", + ], + ) + self.__send_message( + "create_series", + [chart_session, f"s{i + 1}", f"s{i + 1}", f"symbol_{i + 1}", interval, n_bars], + ) + + try: + result = self.ws.recv() + await asyncio.sleep(1.5) + raw_data = raw_data + result + "\n" + except Exception as e: + logger.error(e) + break + + if "series_completed" in result: + symbol_df = await self.__create_df(raw_data, symbol, add_s_field=True) + symbol_df.reset_index(inplace=True) + symbol_dfs[symbol] = symbol_df + + if on_message: + on_message.delay(symbol_df.to_dict('records'), **params) + + self.__send_message( + "remove_series", + [chart_session, f"s{i + 1}"], + ) + raw_data = "" + async def receive_create_df(self, symbol): raw_data = "" logger.debug(f"getting data for {symbol}...") @@ -523,7 +674,6 @@ def get_overview_batch( date_str = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M") url = f"wss://data.tradingview.com/socket.io/websocket?from=symbols%2F{symbols_string_encoded.replace(':', '-')}%2Ffinancials-revenue%2F&date={date_str}" - print(url) self.__create_custom_connection(url) self.__send_message("set_auth_token", [self.token]) @@ -556,6 +706,85 @@ def get_overview_batch( return symbol_dict + async def get_overview_stream( + self, + symbols: List[str], + exchanges: List[str], + fut_contract: int = None, + on_message: Any = None, + params: dict = None + ): + """get historical data + + Args: + symbols (str): symbol name + exchanges (str, optional): exchange, not required if symbol is in format EXCHANGE:SYMBOL. Defaults to None. + fut_contract (int, optional): None for cash, 1 for continuous current contract in front, 2 for continuous next contract in front . Defaults to None. + + Returns: + pd.Dataframe: dataframe with sohlcv as columns + """ + symbols = [ + self.__format_symbol( + symbol=symbol, exchange=exchange, contract=fut_contract + ) for symbol, exchange in zip(symbols, exchanges) + ] + symbols_string_encoded = "%2F".join(symbols) + + date_str = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M") + url = f"wss://data.tradingview.com/socket.io/websocket?from=symbols%2F{symbols_string_encoded.replace(':', '-')}%2Ffinancials-revenue%2F&date={date_str}" + self.__create_custom_connection(url) + + self.__send_message("set_auth_token", [self.token]) + self.__send_message("quote_create_session", [self.session]) + + raw_data = "" + quoted, end_session = False, False + quoted_tickers = list(set(symbols)) + while True: + if (len(quoted_tickers) == 0) or end_session: + break + + for i, symbol in enumerate(list(quoted_tickers)): + if quoted: + continue + self.__send_message( + "quote_add_symbols", [self.session, symbol] + ) + self.__send_message("quote_fast_symbols", [self.session, symbol]) + + logger.debug(f"getting data for {symbol}...") + await asyncio.sleep(1.5) + else: + quoted = True + try: + result = self.ws.recv() + await asyncio.sleep(1.5) + raw_data = raw_data + result + "\n" + except WebSocketException as e: + logger.error(e) + self.__create_custom_connection(url) + self.__send_message("set_auth_token", [self.token]) + self.__send_message("quote_create_session", [self.session]) + + except Exception as e: + logger.error(e) + break + + sym_list = self.__create_overview_result_update(raw_data, single_output=False) + if sym_list is not None: + tickers = [tick["symbol-proname"] for tick in sym_list] + + quoted_tickers = [tick for tick in quoted_tickers if tick not in tickers] + + if len(quoted_tickers) == 0: + end_session = True + + if on_message: + on_message.delay(sym_list, **params) + + raw_data = "" + def search_symbol(self, text: str, exchange: str = ''): url = self.__search_url.format(text, exchange) @@ -572,8 +801,6 @@ def search_symbol(self, text: str, exchange: str = ''): if __name__ == "__main__": - import asyncio - logging.basicConfig(level=logging.DEBUG) tv = TvDatafeed() # print(tv.get_hist("KCB", "NSEKE")) @@ -581,14 +808,20 @@ def search_symbol(self, text: str, exchange: str = ''): # "KCB", # "NSEKE", # )) - # print(asyncio.run(tv.get_hist_batch( + # print(asyncio.run(tv.get_hist_stream( # symbols=["MTNN", "GTCO"], # exchanges=["NSENG", "NSENG"] # ))) - print(tv.get_overview_batch( + + print(asyncio.run(tv.get_overview_stream( symbols=["MTNN", "GTCO"], exchanges=["NSENG", "NSENG"] - )) + ))) + + # print(tv.get_overview_batch( + # symbols=["MTNN", "GTCO"], + # exchanges=["NSENG", "NSENG"] + # )) # print(tv.get_hist("NIFTY", "NSE", fut_contract=1)) # print( # tv.get_hist( From eb89c74a1061d5448ecd20a9fda878763621aa3e Mon Sep 17 00:00:00 2001 From: enoreese Date: Sat, 7 Jun 2025 14:16:33 +0100 Subject: [PATCH 4/9] update-typing --- tvDatafeed/main.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tvDatafeed/main.py b/tvDatafeed/main.py index 39e1cbb..3447854 100644 --- a/tvDatafeed/main.py +++ b/tvDatafeed/main.py @@ -469,7 +469,7 @@ async def get_hist_stream( fut_contract: int = None, extended_session: bool = False, on_message: Any = None, - params: dict = None + params: Dict[str, Any] = None ): """get historical data @@ -604,7 +604,7 @@ def get_overview( symbol: str, exchange: str = "NSE", fut_contract: int = None, - ) -> dict[str, object]: + ) -> Dict[str, object]: """get historical data Args: @@ -653,7 +653,7 @@ def get_overview_batch( symbols: List[str], exchanges: List[str], fut_contract: int = None, - ) -> dict[str, object]: + ) -> Dict[str, object]: """get historical data Args: @@ -712,7 +712,7 @@ async def get_overview_stream( exchanges: List[str], fut_contract: int = None, on_message: Any = None, - params: dict = None + params: Dict[str, Any] = None ): """get historical data From 3084c7d0f19b16de22b434757286a33cacde9bdc Mon Sep 17 00:00:00 2001 From: enoreese Date: Sat, 7 Jun 2025 15:26:10 +0100 Subject: [PATCH 5/9] catch exception in send message --- tvDatafeed/main.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tvDatafeed/main.py b/tvDatafeed/main.py index 3447854..f4e6a9c 100644 --- a/tvDatafeed/main.py +++ b/tvDatafeed/main.py @@ -140,10 +140,18 @@ def __create_message(self, func, paramList): return self.__prepend_header(self.__construct_message(func, paramList)) def __send_message(self, func, args): - m = self.__create_message(func, args) - if self.ws_debug: - print(m) - self.ws.send(m) + try: + m = self.__create_message(func, args) + if self.ws_debug: + print(m) + self.ws.send(m) + except WebSocketConnectionClosedException as e: + logger.error(e) + self.__create_connection() + m = self.__create_message(func, args) + if self.ws_debug: + print(m) + self.ws.send(m) @staticmethod async def __create_df(raw_data, symbol, add_s_field=False): From 0d6fa11c6fc02097b04dc339ab9b005162029e88 Mon Sep 17 00:00:00 2001 From: enoreese Date: Sat, 7 Jun 2025 16:01:28 +0100 Subject: [PATCH 6/9] add rate limit --- tvDatafeed/main.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/tvDatafeed/main.py b/tvDatafeed/main.py index f4e6a9c..77b6b24 100644 --- a/tvDatafeed/main.py +++ b/tvDatafeed/main.py @@ -513,11 +513,9 @@ async def get_hist_stream( logger.debug(f"getting data for {len(symbols)} symbols...") while True: - # try and check if self.ws is not none and is still open - if not self.is_ws_connected(): - self.__create_connection() - for i, symbol in tqdm(enumerate(symbols)): + await asyncio.sleep(3) + chart_session = self.__generate_chart_session() session = self.__generate_session() @@ -573,11 +571,19 @@ async def get_hist_stream( result = self.ws.recv() await asyncio.sleep(1.5) raw_data = raw_data + result + "\n" + except Exception as e: + logger.error(e) + self.__create_connection() + self.__send_message("set_auth_token", [self.token]) + result = self.ws.recv() + await asyncio.sleep(1.5) + raw_data = raw_data + result + "\n" + except Exception as e: logger.error(e) break - if "series_completed" in result: + if "series_completed" in result: symbol_df = await self.__create_df(raw_data, symbol, add_s_field=True) symbol_df.reset_index(inplace=True) symbol_dfs[symbol] = symbol_df @@ -591,6 +597,8 @@ async def get_hist_stream( ) raw_data = "" + await asyncio.sleep(1.5) + async def receive_create_df(self, symbol): raw_data = "" logger.debug(f"getting data for {symbol}...") @@ -756,13 +764,15 @@ async def get_overview_stream( for i, symbol in enumerate(list(quoted_tickers)): if quoted: continue + + await asyncio.sleep(3) self.__send_message( "quote_add_symbols", [self.session, symbol] ) self.__send_message("quote_fast_symbols", [self.session, symbol]) logger.debug(f"getting data for {symbol}...") - await asyncio.sleep(1.5) + await asyncio.sleep(3) else: quoted = True try: @@ -793,6 +803,8 @@ async def get_overview_stream( raw_data = "" + await asyncio.sleep(1.5) + def search_symbol(self, text: str, exchange: str = ''): url = self.__search_url.format(text, exchange) From d7f057016f82a69ccf34c9d87b4b534599742827 Mon Sep 17 00:00:00 2001 From: enoreese Date: Sat, 7 Jun 2025 17:21:48 +0100 Subject: [PATCH 7/9] update batch --- tvDatafeed/main.py | 50 +++++++++++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/tvDatafeed/main.py b/tvDatafeed/main.py index 77b6b24..afff3bb 100644 --- a/tvDatafeed/main.py +++ b/tvDatafeed/main.py @@ -364,7 +364,8 @@ async def get_hist_batch( n_bars: int = 10, fut_contract: int = None, extended_session: bool = False, - session=None + on_message: Any = None, + params: Dict[str, Any] = None ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: """get historical data @@ -388,14 +389,10 @@ async def get_hist_batch( symbol=symbol, exchange=exchange, contract=fut_contract ) for symbol, exchange in zip(symbols, exchanges) ] - symbols_string = ",".join(symbols) interval = interval.value - if not session: - self.__create_connection() - else: - self.ws = session + self.__create_connection() self.__send_message("set_auth_token", [self.token]) self.__send_message("chart_create_session", [self.chart_session, ""]) @@ -432,6 +429,7 @@ async def get_hist_batch( symbol_dfs = {} for i, symbol in enumerate(symbols): + await asyncio.sleep(2) self.__send_message( "resolve_symbol", [ @@ -449,12 +447,30 @@ async def get_hist_batch( [self.chart_session, f"s{i + 1}", f"s{i + 1}", f"symbol_{i + 1}", interval, n_bars], ) - symbol_dfs[symbol] = await self.receive_create_df(symbol) + raw_data = "" + logger.debug(f"getting data for {symbol}...") + while True: + try: + result = self.ws.recv() + raw_data = raw_data + result + "\n" + except Exception as e: + logger.error(e) + break - self.__send_message( - "remove_series", - [self.chart_session, f"s{i + 1}"], - ) + if "series_completed" in result: + symbol_df = await self.__create_df(raw_data, symbol, add_s_field=True) + symbol_df.reset_index(inplace=True) + symbol_dfs[symbol] = symbol_df + + if on_message: + on_message.delay(symbol_df.to_dict('records'), **params) + + self.__send_message( + "remove_series", + [self.chart_session, f"s{i + 1}"], + ) + + break self.__send_message("switch_timezone", [ self.chart_session, "exchange"]) @@ -664,11 +680,13 @@ def get_overview( # print(f"Raw: {raw_data}") return self.__create_overview_result(raw_data, symbol) - def get_overview_batch( + async def get_overview_batch( self, symbols: List[str], exchanges: List[str], fut_contract: int = None, + on_message: Any = None, + params: Dict[str, Any] = None ) -> Dict[str, object]: """get historical data @@ -697,6 +715,7 @@ def get_overview_batch( symbol_dict = {} for i, symbol in enumerate(symbols): + await asyncio.sleep(2) self.__send_message( "quote_add_symbols", [self.session, symbol] ) @@ -713,8 +732,11 @@ def get_overview_batch( logger.error(e) break - if "series_completed" in result: - break + if "quote_completed" in result: + sym_list = self.__create_overview_result_update(raw_data, single_output=False) + + if on_message: + on_message.delay(sym_list, **params) # print(f"Raw-{symbol}: {raw_data}") From 4c82a4f4fe600c9d89f5474e639b7e01a5356842 Mon Sep 17 00:00:00 2001 From: enoreese Date: Thu, 31 Jul 2025 22:18:12 +0100 Subject: [PATCH 8/9] update overview batch --- tvDatafeed/main.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tvDatafeed/main.py b/tvDatafeed/main.py index afff3bb..bc0f0ad 100644 --- a/tvDatafeed/main.py +++ b/tvDatafeed/main.py @@ -221,7 +221,7 @@ def __create_overview_result(self, raw_data, symbol): def __create_overview_result_update(self, raw_data, single_output=True): try: raw_data = re.findall('"v":\{(.+?)\}~m', raw_data) - matches = [json.loads("{" + out[:-3] + "}") for out in raw_data] + matches = [json.loads("{" + out[:-3] + "}") for out in raw_data if len(out) > 0] if single_output: out = [match for match in matches if "business_description" in match][0] else: @@ -734,14 +734,12 @@ async def get_overview_batch( if "quote_completed" in result: sym_list = self.__create_overview_result_update(raw_data, single_output=False) + for sym in sym_list: + symbol_dict[sym['short_name']] = sym if on_message: on_message.delay(sym_list, **params) - # print(f"Raw-{symbol}: {raw_data}") - - symbol_dict[symbols[i]] = self.__create_overview_result_update(raw_data, symbol) - return symbol_dict async def get_overview_stream( From 7bd4466ba5af62d500d3ebdc3164ac9a39a851ed Mon Sep 17 00:00:00 2001 From: enoreese Date: Thu, 31 Jul 2025 23:27:25 +0100 Subject: [PATCH 9/9] update overview batch --- tvDatafeed/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tvDatafeed/main.py b/tvDatafeed/main.py index bc0f0ad..bdc365b 100644 --- a/tvDatafeed/main.py +++ b/tvDatafeed/main.py @@ -734,6 +734,8 @@ async def get_overview_batch( if "quote_completed" in result: sym_list = self.__create_overview_result_update(raw_data, single_output=False) + if not sym_list: + continue for sym in sym_list: symbol_dict[sym['short_name']] = sym