88
99from lib .core .reporter import Reporter
1010
11+ _seconds = 2 ** 10
12+ TIMEOUT = aiohttp .ClientTimeout (total = _seconds , sock_connect = _seconds , sock_read = _seconds )
13+
1114
1215class StreamedAnnotations :
1316 DELIMITER = b"\\ n;)\\ n"
@@ -34,15 +37,12 @@ async def fetch(
3437 data : dict = None ,
3538 params : dict = None ,
3639 ):
37- kwargs = {
38- "params" : params ,
39- "json" : {
40- "folder_id" : params .pop ("folder_id" )
41- }
42- }
40+ kwargs = {"params" : params , "json" : {"folder_id" : params .pop ("folder_id" )}}
4341 if data :
4442 kwargs ["json" ].update (data )
45- response = await session ._request (method , url , ** kwargs , raise_for_status = True )
43+ response = await session ._request (
44+ method , url , ** kwargs , raise_for_status = True , timeout = TIMEOUT
45+ )
4646 buffer = b""
4747 async for line in response .content .iter_any ():
4848 slices = line .split (self .DELIMITER )
@@ -63,49 +63,59 @@ async def fetch(
6363 async def process_chunk (
6464 self ,
6565 method : str ,
66- session : aiohttp .ClientSession ,
6766 url : str ,
6867 data : dict = None ,
6968 params : dict = None ,
69+ verify_ssl = False ,
7070 ):
71- async for annotation in self .fetch (
72- method ,
73- session ,
74- url ,
75- self ._process_data (data ),
76- params = params ,
77- ):
78- self ._annotations .append (
79- self ._callback (annotation ) if self ._callback else annotation
80- )
71+ async with aiohttp .ClientSession (
72+ headers = self ._headers ,
73+ timeout = TIMEOUT ,
74+ connector = aiohttp .TCPConnector (ssl = verify_ssl , keepalive_timeout = 2 ** 32 ),
75+ ) as session :
76+ async for annotation in self .fetch (
77+ method ,
78+ session ,
79+ url ,
80+ self ._process_data (data ),
81+ params = params ,
82+ ):
83+ self ._annotations .append (
84+ self ._callback (annotation ) if self ._callback else annotation
85+ )
8186
8287 async def store_chunk (
8388 self ,
8489 method : str ,
85- session : aiohttp .ClientSession ,
8690 url : str ,
8791 download_path ,
8892 postfix ,
8993 data : dict = None ,
9094 params : dict = None ,
9195 ):
92- async for annotation in self .fetch (
93- method ,
94- session ,
95- url ,
96- self ._process_data (data ),
97- params = params ,
98- ):
99- self ._annotations .append (
100- self ._callback (annotation ) if self ._callback else annotation
101- )
102- self ._store_annotation (
103- download_path ,
104- postfix ,
105- annotation ,
106- self ._callback ,
107- )
108- self ._items_downloaded += 1
96+ async with aiohttp .ClientSession (
97+ raise_for_status = True ,
98+ headers = self ._headers ,
99+ timeout = TIMEOUT ,
100+ connector = aiohttp .TCPConnector (ssl = False , keepalive_timeout = 2 ** 32 ),
101+ ) as session :
102+ async for annotation in self .fetch (
103+ method ,
104+ session ,
105+ url ,
106+ self ._process_data (data ),
107+ params = params ,
108+ ):
109+ self ._annotations .append (
110+ self ._callback (annotation ) if self ._callback else annotation
111+ )
112+ self ._store_annotation (
113+ download_path ,
114+ postfix ,
115+ annotation ,
116+ self ._callback ,
117+ )
118+ self ._items_downloaded += 1
109119
110120 async def get_data (
111121 self ,
@@ -116,19 +126,19 @@ async def get_data(
116126 chunk_size : int = 5000 ,
117127 verify_ssl : bool = False ,
118128 ):
119- async with aiohttp . ClientSession (
120- headers = self . _headers ,
121- connector = aiohttp . TCPConnector ( ssl = verify_ssl ),
122- ) as session :
123- params [ "limit" ] = chunk_size
124- await asyncio . gather (
125- * [
126- self . process_chunk (
127- method = method , session = session , url = url , data = data [ i : i + chunk_size ] ,
128- params = copy . copy ( params )
129- ) for i in
130- range ( 0 , len ( data ), chunk_size ) ]
131- )
129+ params [ "limit" ] = chunk_size
130+ await asyncio . gather (
131+ * [
132+ self . process_chunk (
133+ method = method ,
134+ url = url ,
135+ data = data [ i : i + chunk_size ],
136+ params = copy . copy ( params ),
137+ verify_ssl = verify_ssl ,
138+ )
139+ for i in range ( 0 , len ( data ), chunk_size )
140+ ]
141+ )
132142
133143 return self ._annotations
134144
@@ -150,7 +160,6 @@ async def download_data(
150160 data : list ,
151161 download_path : str ,
152162 postfix : str ,
153- session ,
154163 method : str = "post" ,
155164 params = None ,
156165 chunk_size : int = 5000 ,
@@ -162,9 +171,14 @@ async def download_data(
162171 await asyncio .gather (
163172 * [
164173 self .store_chunk (
165- method = method , session = session , url = url , data = data [i : i + chunk_size ], params = copy .copy (params ),
166- download_path = download_path , postfix = postfix
167- ) for i in
168- range (0 , len (data ), chunk_size )]
174+ method = method ,
175+ url = url ,
176+ data = data [i : i + chunk_size ], # noqa
177+ params = copy .copy (params ),
178+ download_path = download_path ,
179+ postfix = postfix ,
180+ )
181+ for i in range (0 , len (data ), chunk_size )
182+ ]
169183 )
170184 return self ._items_downloaded
0 commit comments