import io
import os
import shutil
import posixpath
import re
import tempfile
from fsspec import AbstractFileSystem
from fsspec.utils import stringify_path
[docs]class LocalFileSystem(AbstractFileSystem):
"""Interface to files on local storage
Parameters
----------
auto_mkdirs: bool
Whether, when opening a file, the directory containing it should
be created (if it doesn't already exist). This is assumed by pyarrow
code.
"""
root_marker = "/"
def __init__(self, auto_mkdir=True, **kwargs):
super().__init__(**kwargs)
self.auto_mkdir = auto_mkdir
[docs] def mkdir(self, path, create_parents=True, **kwargs):
path = self._strip_protocol(path)
if create_parents:
self.makedirs(path, exist_ok=True)
else:
os.mkdir(path, **kwargs)
[docs] def makedirs(self, path, exist_ok=False):
path = self._strip_protocol(path)
os.makedirs(path, exist_ok=exist_ok)
[docs] def rmdir(self, path):
os.rmdir(path)
[docs] def ls(self, path, detail=False):
path = self._strip_protocol(path)
paths = [posixpath.join(path, f) for f in os.listdir(path)]
if detail:
return [self.info(f) for f in paths]
else:
return paths
[docs] def glob(self, path, **kargs):
path = self._strip_protocol(path)
return super().glob(path)
[docs] def info(self, path, **kwargs):
path = self._strip_protocol(path)
out = os.stat(path, follow_symlinks=False)
dest = False
if os.path.islink(path):
t = "link"
dest = os.readlink(path)
elif os.path.isdir(path):
t = "directory"
elif os.path.isfile(path):
t = "file"
else:
t = "other"
result = {"name": path, "size": out.st_size, "type": t, "created": out.st_ctime}
for field in ["mode", "uid", "gid", "mtime"]:
result[field] = getattr(out, "st_" + field)
if dest:
result["destination"] = dest
try:
out2 = os.stat(path, follow_symlinks=True)
result["size"] = out2.st_size
except IOError:
result["size"] = 0
return result
[docs] def copy(self, path1, path2, **kwargs):
shutil.copyfile(path1, path2)
[docs] def get(self, path1, path2, **kwargs):
if kwargs.get("recursive"):
return super(LocalFileSystem, self).get(path1, path2, **kwargs)
else:
return self.copy(path1, path2, **kwargs)
[docs] def put(self, path1, path2, **kwargs):
if kwargs.get("recursive"):
return super(LocalFileSystem, self).put(path1, path2, **kwargs)
else:
return self.copy(path1, path2, **kwargs)
[docs] def mv(self, path1, path2, **kwargs):
os.rename(path1, path2)
[docs] def rm(self, path, recursive=False, maxdepth=None):
if recursive and self.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
def _open(self, path, mode="rb", block_size=None, **kwargs):
path = self._strip_protocol(path)
if self.auto_mkdir:
self.makedirs(self._parent(path), exist_ok=True)
return LocalFileOpener(path, mode, fs=self, **kwargs)
[docs] def touch(self, path, **kwargs):
path = self._strip_protocol(path)
if self.exists(path):
os.utime(path, None)
else:
open(path, "a").close()
@classmethod
def _parent(cls, path):
path = cls._strip_protocol(path).rstrip("/")
if "/" in path:
return path.rsplit("/", 1)[0]
else:
return cls.root_marker
@classmethod
def _strip_protocol(cls, path):
path = stringify_path(path)
if path.startswith("file://"):
path = path[7:]
return make_path_posix(path)
def _isfilestore(self):
# Inheriting from DaskFileSystem makes this False (S3, etc. were)
# the original motivation. But we are a posix-like file system.
# See https://github.com/dask/dask/issues/5526
return True
def make_path_posix(path, sep=os.sep):
""" Make path generic """
if re.match("/[A-Za-z]:", path):
# for windows file URI like "file:///C:/folder/file"
# or "file:///C:\\dir\\file"
path = path[1:]
if path.startswith("\\\\"):
# special case for windows UNC/DFS-style paths, do nothing,
# jsut flip the slashes around (case below does not work!)
return path.replace("\\", "/")
if path.startswith("\\") or re.match("[\\\\]*[A-Za-z]:", path):
# windows full path "\\server\\path" or "C:\\local\\path"
return path.lstrip("\\").replace("\\", "/").replace("//", "/")
if (
sep not in path
and "/" not in path
or (sep == "/" and not path.startswith("/"))
or (sep == "\\" and ":" not in path)
):
# relative path like "path" or "rel\\path" (win) or rel/path"
path = os.path.abspath(path)
if os.sep == "\\":
# abspath made some more '\\' separators
return make_path_posix(path, sep)
return path
class LocalFileOpener(object):
def __init__(self, path, mode, autocommit=True, fs=None, **kwargs):
self.path = path
self.mode = mode
self.fs = fs
self.f = None
self.autocommit = autocommit
self.blocksize = io.DEFAULT_BUFFER_SIZE
self._open()
def _open(self):
if self.f is None or self.f.closed:
if self.autocommit or "w" not in self.mode:
self.f = open(self.path, mode=self.mode)
else:
# TODO: check if path is writable?
i, name = tempfile.mkstemp()
self.temp = name
self.f = open(name, mode=self.mode)
if "w" not in self.mode:
self.details = self.fs.info(self.path)
self.size = self.details["size"]
self.f.size = self.size
def _fetch_range(self, start, end):
# probably only used by cached FS
if "r" not in self.mode:
raise ValueError
self._open()
self.f.seek(start)
return self.f.read(end - start)
def __setstate__(self, state):
if "r" in state["mode"]:
loc = self.state.pop("loc")
self._open()
self.f.seek(loc)
else:
self.f = None
self.__dict__.update(state)
def __getstate__(self):
d = self.__dict__.copy()
d.pop("f")
if "r" in self.mode:
d["loc"] = self.f.tell()
else:
if not self.f.closed:
raise ValueError("Cannot serialise open write-mode local file")
return d
def commit(self):
if self.autocommit:
raise RuntimeError("Can only commit if not already set to autocommit")
os.rename(self.temp, self.path)
def discard(self):
if self.autocommit:
raise RuntimeError("Cannot discard if set to autocommit")
os.remove(self.temp)
def __fspath__(self):
# uniquely for fsspec implementations, this is a real path
return self.path
def __getattr__(self, item):
return getattr(self.f, item)
def __enter__(self):
self._incontext = True
return self.f.__enter__()
def __exit__(self, exc_type, exc_value, traceback):
self._incontext = False
self.f.__exit__(exc_type, exc_value, traceback)