I made my own filesystem in the fsspec library and I am trying to read in dask dataframes from this filesystem object to open the dataframe file. However I am getting an error when I try to do this. My guess is that the dask workers don’t have a good copy of the filesystem. Here is some test code
from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation
class MyLocalFileSystem(AbstractFileSystem):
def __init__(self,*args,**kwargs):
super().__init__(*args,**kwargs)
self.rawfs=LocalFileSystem(*args,**kwargs)
def __wrap(method_name):
return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)
_open = __wrap("open")
info = __wrap("info")
ls = __wrap("ls")
del __wrap
register_implementation("mfs",MyLocalFileSystem,clobber=True)
import dask.dataframe as dd
import pandas as pd
from tempfile import NamedTemporaryFile
from dask.distributed import Client
with NamedTemporaryFile(mode='wt') as f, Client(): #works if I remove `, Client() (and no other client is running)`
f.write("An0")
f.flush()
print("pd, localfs",pd.read_csv(f.name).size)
print("dd, localfs",dd.read_csv(f.name).size.compute())
print("pd, myfs",pd.read_csv(f"mfs://{f.name}").size)
# print("workaround",dd.from_pandas(pd.read_csv(f"mfs://{f.name}"),npartitions=1).size.compute())
print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
and I get an error
2024-06-08 14:48:33,328 - distributed.worker - WARNING - Compute Failed
Key: ('size-chunk-232fa012bb421939d7011c3af11ac4a7-ea46e61534d2be2ea62e2fe234f0d607', 0)
Function: execute_task
args: ((subgraph_callable-92b603c9f28a44f7e623972919b6934a, [(<function read_block_from_file at 0x75336666f420>, <OpenFile '/tmp/tmpob6dxgol'>, 0, 3, b'n'), None, True, True]))
kwargs: {}
Exception: 'AttributeError("'MyLocalFileSystem' object has no attribute 'rawfs'")'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[15], line 33
31 print("pd, myfs",pd.read_csv(f"mfs://{f.name}").size)
32 # print("workaround",dd.from_pandas(pd.read_csv(f"mfs://{f.name}"),npartitions=1).size.compute())
---> 33 print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
351 def compute(self, **kwargs):
352 """Compute this dask collection
353
354 This turns a lazy Dask collection into its in-memory equivalent.
(...)
373 dask.compute
374 """
--> 375 (result,) = compute(self, traverse=False, **kwargs)
376 return result
File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 postcomputes.append(x.__dask_postcompute__())
660 with shorten_traceback():
--> 661 results = schedule(dsk, keys, **kwargs)
663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191 with copy.copy(lazy_file) as f:
192 if off == 0 and bs is None:
193 return f.read()
File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
100 def __enter__(self):
101 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103 f = self.fs.open(self.path, mode=mode)
105 self.fobjects = [f]
107 if self.compression is not None:
File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
1291 else:
1292 ac = kwargs.pop("autocommit", not self._intrans)
-> 1293 f = self._open(
1294 path,
1295 mode=mode,
1296 block_size=block_size,
1297 autocommit=ac,
1298 cache_options=cache_options,
1299 **kwargs,
1300 )
1301 if compression is not None:
1302 from fsspec.compression import compr
Cell In[15], line 10, in <lambda>()
9 def __wrap(method_name):
---> 10 return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)
AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'
Is this supposed to work? How can I get this working?