doc/.venv/Lib/site-packages/cerulean/local_file_system.py

222 lines
7.8 KiB
Python
Raw Normal View History

import errno
import os
import pathlib
from typing import cast, Any, Generator, Iterable, Optional
from cerulean.file_system import FileSystem
from cerulean.file_system_impl import FileSystemImpl
from cerulean.path import AbstractPath, EntryType, Path, Permission
class LocalFileSystem(FileSystemImpl):
"""Represents the local file system.
To create an instance, just call `LocalFileSystem()`.
LocalFileSystem support a single operation:
.. code-block:: python
fs / 'path'
which produces a :class:`Path`, through which you can do things \
with local files.
LocalFileSystem is a context manager, so you can use it in a \
``with`` statement, and it has a :meth:`close` method, but since \
it doesn't hold any resources, you do not need to use them. It may \
be good to do so anyway, to avoid leaks if you ever replace it with \
a different :class:`FileSystem` that does.
"""
def __eq__(self, other: Any) -> bool:
"""Return True iff this file system equals other."""
if not isinstance(other, FileSystem):
return NotImplemented
return isinstance(other, LocalFileSystem)
def root(self) -> Path:
return Path(self, pathlib.Path('/'))
def __truediv__(self, segment: str) -> Path:
# TODO: segment: Union[str, pathlib.Path]?
absseg = '/' + segment.strip('/')
path = pathlib.Path(absseg)
return Path(self, path)
def _supports(self, feature: str) -> bool:
if feature not in self._features:
raise ValueError('Invalid argument for "feature"')
return True
def _exists(self, path: AbstractPath) -> bool:
lpath = cast(pathlib.Path, path)
try:
return lpath.exists()
except OSError as e:
if e.errno == errno.ELOOP:
return False
raise
def _mkdir(self,
path: AbstractPath,
mode: Optional[int] = None,
parents: bool = False,
exists_ok: bool = False) -> None:
lpath = cast(pathlib.Path, path)
if mode is not None:
lpath.mkdir(mode, parents, exists_ok)
else:
lpath.mkdir(parents=parents, exist_ok=exists_ok)
def _iterdir(self, path: AbstractPath) -> Generator[AbstractPath, None, None]:
lpath = cast(pathlib.Path, path)
for entry in lpath.iterdir():
yield entry
def _rmdir(self, path: AbstractPath, recursive: bool = False) -> None:
lpath = cast(pathlib.Path, path)
if not lpath.is_dir():
raise RuntimeError('Path must refer to a directory')
if recursive:
for entry in lpath.iterdir():
if entry.is_symlink():
entry.unlink()
elif entry.is_dir():
self._rmdir(entry, True)
else:
entry.unlink()
lpath.rmdir()
def _touch(self, path: AbstractPath) -> None:
lpath = cast(pathlib.Path, path)
lpath.touch()
def _streaming_read(self, path: AbstractPath) -> Generator[bytes, None, None]:
# Buffer size vs. speed (MB/s) against localhost
# up down local
# 8k 33 56 159
# 16k 52 56 145
# 24k 66 57 150
# 32k 24 57 149
# 2M 24 48
# scp 120 110
# cp 172
lpath = cast(pathlib.Path, path)
with lpath.open('rb') as f:
data = f.read(24576)
while len(data) > 0:
yield data
data = f.read(24576)
def _streaming_write(self, path: AbstractPath, data: Iterable[bytes]) -> None:
lpath = cast(pathlib.Path, path)
with lpath.open('wb') as f:
for chunk in data:
f.write(chunk)
def _rename(self, path: AbstractPath, target: AbstractPath) -> None:
lpath = cast(pathlib.Path, path)
ltarget = cast(pathlib.Path, target)
lpath.replace(pathlib.Path(ltarget))
def _unlink(self, path: AbstractPath) -> None:
lpath = cast(pathlib.Path, path)
lpath.unlink()
def _is_dir(self, path: AbstractPath) -> bool:
lpath = cast(pathlib.Path, path)
return lpath.is_dir()
def _is_file(self, path: AbstractPath) -> bool:
lpath = cast(pathlib.Path, path)
return lpath.is_file()
def _is_symlink(self, path: AbstractPath) -> bool:
lpath = cast(pathlib.Path, path)
return lpath.is_symlink()
def _entry_type(self, path: AbstractPath) -> EntryType:
lpath = cast(pathlib.Path, path)
# Note: symlink goes first, because is_dir() and is_file() will
# dereference and return true, while we want to say it's a
# symlink and leave it at that.
pred_to_type = [(pathlib.Path.is_symlink, EntryType.SYMBOLIC_LINK),
(pathlib.Path.is_dir, EntryType.DIRECTORY),
(pathlib.Path.is_file, EntryType.FILE),
(pathlib.Path.is_char_device,
EntryType.CHARACTER_DEVICE),
(pathlib.Path.is_block_device, EntryType.BLOCK_DEVICE),
(pathlib.Path.is_fifo, EntryType.FIFO),
(pathlib.Path.is_socket, EntryType.SOCKET)]
if not self._exists(lpath):
raise OSError(errno.ENOENT, 'No such file or directory',
str(lpath))
for pred, entry_type in pred_to_type:
if pred(lpath):
return entry_type
raise RuntimeError('Object is of unknown type, please report a'
'Cerulean bug')
def _size(self, path: AbstractPath) -> int:
lpath = cast(pathlib.Path, path)
return lpath.stat().st_size
def _uid(self, path: AbstractPath) -> int:
lpath = cast(pathlib.Path, path)
return lpath.stat().st_uid
def _gid(self, path: AbstractPath) -> int:
lpath = cast(pathlib.Path, path)
return lpath.stat().st_gid
def _has_permission(self, path: AbstractPath, permission: Permission) -> bool:
lpath = cast(pathlib.Path, path)
return bool(lpath.stat().st_mode & permission.value)
def _set_permission(self, path: AbstractPath, permission: Permission,
value: bool = True) -> None:
lpath = cast(pathlib.Path, path)
mode = lpath.stat().st_mode
if value:
mode = mode | permission.value
else:
mode = mode & ~permission.value
self._chmod(lpath, mode)
def _chmod(self, path: AbstractPath, mode: int) -> None:
lpath = cast(pathlib.Path, path)
lpath.chmod(mode)
def _symlink_to(self, path: AbstractPath, target: AbstractPath) -> None:
lpath = cast(pathlib.Path, path)
ltarget = cast(pathlib.Path, target)
lpath.symlink_to(ltarget)
def _readlink(self, path: AbstractPath, recursive: bool) -> Path:
lpath = cast(pathlib.Path, path)
if recursive:
# pathlib.Path.resolve() raises if the link is broken
# we don't want that, so use our own implementation
max_iter = 32
cur_path = lpath
iter_count = 0
while cur_path.is_symlink() and iter_count < max_iter:
target = pathlib.Path(os.readlink(str(cur_path)))
if not target.is_absolute():
target = cur_path.parent / target
cur_path = target
iter_count += 1
if iter_count == max_iter:
raise RuntimeError('Too many symbolic links detected')
return Path(self, cur_path)
else:
return Path(self, pathlib.Path(
os.readlink(str(lpath))))