I have a class called Series, which inherits from a base class called Fred. Series has a number of methods, which retrieve data from different API end points and process that data into properties of the object. The base class Fred simply contains a method for retrieving an API key, and methods to format requests and submit them to the server. I have written all of the relevant “get” methods using asyncio in order to improve the data retrieval time, and if used in a python script or at the iPython console this works fine. However, I would like this code to be useful in a Jupyter notebook as well. The issue I am having is that while I can add the task to the existing Jupyter notebook event loop, somewhere in the middle of executing tasks, the async tasks never seem to property return data.
Below is the relevant code:
RUN_FETCH_DATA IS RUN AT INITIALIZATION AND CAN BE RERUN:
def run_fetch_data(self, *args):
try:
loop = asyncio.get_running_loop()
except RuntimeError: # 'RuntimeError: There is no current event loop...'
loop = None
if loop and loop.is_running():
print('Async event loop already running. Adding coroutine to the event loop.')
loop.create_task(self.async_fetch_data(*args, current_loop=loop))
# # ^-- https://docs.python.org/3/library/asyncio-task.html#task-object
else:
print('Starting new event loop')
asyncio.run(self.async_fetch_data(*args))
return self
SUPPORTING FUNCTION 1 (NOTE: if kwargs is not provided, this returns the correct data):
async def async_fetch_data(self, *args, **kwargs):
if not args:
tasks = [self._fetch_series_info,
self._fetch_series_release_info,
self._fetch_series_vintagedates,
self._fetch_series_categories,
self._fetch_series_tags,
self._fetch_series_observations]
else:
tasks = []
if 'info' in args:
tasks.append(self._fetch_series_info)
else:
pass
if 'release_info' in args:
tasks.append(self._fetch_series_release_info)
else:
pass
if 'categories' in args:
tasks.append(self._fetch_series_categories)
else:
pass
if 'observations' in args:
tasks.append(self._fetch_series_observations)
else:
pass
if 'vintagedates' in args:
tasks.append(self._fetch_series_vintagedates)
if 'tags' in args:
tasks.append(self._fetch_series_tags)
else:
pass
if not kwargs or kwargs is None:
async with aiohttp.ClientSession(self._url_base) as session:
async with asyncio.TaskGroup() as tg:
for task in tasks:
tg.create_task(task(session))
elif 'current_loop' in kwargs.keys():
loop = kwargs['current_loop']
async with aiohttp.ClientSession(self._url_base) as session:
for task in tasks:
loop.create_task(task(session))
else:
raise ValueError('Keyword argument "current_loop" not found in kwargs')
return self
SUPPORTING FUNCTION 2 (other functions called in tasks are similar to this)
async def _fetch_series_info(self, session):
try:
raw_response = await self._async_fetch_data(session, self._series_info_path,
f'series_id={self.series_id}',
**self._url_options)
if isinstance(raw_response, dict):
raw_response = raw_response['seriess'][0]
elif isinstance(raw_response, ET.Element):
raw_response = raw_response[0].attrib
else:
raise TypeError('Fred response was neither JSON nor XML, series info get failed')
setattr(self, 'series_name', raw_response['title'])
setattr(self, 'observation_start', dt.date.fromisoformat(raw_response['observation_start']))
setattr(self, 'observation_end', dt.date.fromisoformat(raw_response['observation_end']))
setattr(self, 'frequency', raw_response['frequency_short'])
setattr(self, 'units', raw_response['units_short'])
setattr(self, 'seasonality_adjustment', raw_response['seasonal_adjustment_short'])
setattr(self, 'series_notes', raw_response['notes'])
setattr(self, 'update_info', dt.datetime.fromisoformat(raw_response['last_updated']))
except TypeError:
print(f'Error fetching info for series {self.series_id}')
except HTTPError as http_msg:
# TODO: verify that this actually picks up the HTTPError from parent class
print(http_msg)
finally:
return self
SUPPORTING FUNCTION 3 (this is the base caller function in the Fred base class):
async def _async_fetch_data(self, session: aiohttp.ClientSession,
query_path: str, query: str = '', **kwargs) -> dict | ET.ElementTree | ValueError:
"""
Internal function used to get and parse requests from FRED server
Parameters
----------
target_url : str
the target url including any options for the retrieval of information from FRED
Returns
-------
response : dict, XT.Element
JSON: a dictionary containing the searched data.
XML: an Element Tree or Element Tree.Element containing searched data.
"""
request_parse = url_parse.urlparse(Fred._url_base)
query_options = {'api_key': self.api_key,
'file_type': self._default_response_type,
'max_response_per_request': self._max_response_per_request,
'realtime_start': (self.realtime_start if self.realtime_start is not None else self._earliest_realtime_start),
'realtime_end': (self.realtime_end if self.realtime_end is not None else self._latest_realtime_end)}
if self.observation_end is not None:
query_options['observation_end'] = str(self.observation_end)
if self.observation_start is not None:
query_options['observation_start'] = str(self.observation_start)
for kw, arg in kwargs.items():
if self.check_optional_request_parameters(kw, arg):
query_options[kw] = arg
target_path = ''.join([query_path, '?', query, '&', url_parse.urlencode(query_options)])
target_url = url_parse.urlunparse((url_parse.urlparse(Fred._url_base)
._replace(path=url_parse.urljoin(request_parse.path, query_path))
._replace(query=''.join([query,'&',url_parse.urlencode(query_options)]))))
try:
async with session.get(target_path) as raw_response:
if query_options['file_type'] == 'json':
response = await raw_response.json()
else:
response = ET.fromstring(await raw_response.text())
except HTTPError as msg:
response = ET.fromstring(msg.read())
raise ValueError(response.get('message'))
return response
4