"""Reading and writing common file formats."""
# Core Library
import csv
import hashlib
import json
import os
import pickle
import platform
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
# Third party
from typing_extensions import Literal
# First party
from mpu.datastructures import EList
[docs]def read(filepath: str, **kwargs: Any) -> Any:
"""
Read a file.
Supported formats:
* CSV
* JSON, JSONL
* pickle
Parameters
----------
filepath : str
Path to the file that should be read. This methods action depends
mainly on the file extension.
kwargs : Dict
Any keywords for the specific file format. For CSV, this is
'delimiter', 'quotechar', 'skiprows', 'format'
Returns
-------
data : Union[str, bytes] or other (e.g. format=dicts)
"""
supported_formats = [".csv", ".json", ".jsonl", ".pickle"]
if filepath.lower().endswith(".csv"):
return _read_csv(filepath, kwargs)
elif filepath.lower().endswith(".json"):
with open(filepath, encoding="utf8") as data_file:
data: Any = json.load(data_file, **kwargs)
return data
elif filepath.lower().endswith(".jsonl"):
return _read_jsonl(filepath, kwargs)
elif filepath.lower().endswith(".pickle"):
with open(filepath, "rb") as handle:
data_pkl = pickle.load(handle)
return data_pkl
elif filepath.lower().endswith(".yml") or filepath.lower().endswith(".yaml"):
raise NotImplementedError(
"YAML is not supported, because you need "
"PyYAML in Python3. "
"See "
"https://stackoverflow.com/a/42054860/562769"
" as a guide how to use it."
)
elif filepath.lower().endswith(".h5") or filepath.lower().endswith(".hdf5"):
raise NotImplementedError(
"HDF5 is not supported. See "
"https://stackoverflow.com/a/41586571/562769"
" as a guide how to use it."
)
else:
raise NotImplementedError(
f"File '{filepath}' does not end with one "
f"of the supported file name extensions. "
f"Supported are: {supported_formats}"
)
def _read_csv(filepath: str, kwargs: Dict) -> Union[List, Dict]:
"""See documentation of mpu.io.read."""
if "delimiter" not in kwargs:
kwargs["delimiter"] = ","
if "quotechar" not in kwargs:
kwargs["quotechar"] = '"'
if "skiprows" not in kwargs:
kwargs["skiprows"] = []
if isinstance(kwargs["skiprows"], int):
kwargs["skiprows"] = list(range(kwargs["skiprows"]))
if "format" in kwargs:
format_ = kwargs["format"]
kwargs.pop("format", None)
else:
format_ = "default"
skiprows = kwargs["skiprows"]
kwargs.pop("skiprows", None)
newline = None
if "newline" in kwargs:
newline = kwargs["newline"]
del kwargs["newline"]
with open(filepath, encoding="utf8", newline=newline) as fp:
if format_ == "default":
reader = csv.reader(fp, **kwargs)
data_tmp = EList(list(reader))
data: Union[List, Dict] = data_tmp.remove_indices(skiprows)
elif format_ == "dicts":
reader_list = csv.DictReader(fp, **kwargs)
data = list(reader_list)
else:
raise NotImplementedError(f"Format '{format_}' unknown")
return data
def _read_jsonl(filepath: str, kwargs: Dict) -> List:
"""See documentation of mpu.io.read."""
with open(filepath, encoding="utf8") as data_file:
data = [json.loads(line, **kwargs) for line in data_file if len(line) > 0]
return data
[docs]def write(filepath: str, data: Union[Dict, List], **kwargs: Any) -> Any:
"""
Write a file.
Supported formats:
* CSV
* JSON, JSONL
* pickle
Parameters
----------
filepath : str
Path to the file that should be read. This methods action depends
mainly on the file extension. Make sure that it ends in .csv, .json,
.jsonl, or .pickle.
data : Union[Dict, List]
Content that should be written
kwargs : Dict
Any keywords for the specific file format.
Returns
-------
data : str or bytes
"""
supported_formats = [".csv", ".json", ".jsonl", ".pickle"]
if filepath.lower().endswith(".csv"):
return _write_csv(filepath, data, kwargs)
elif filepath.lower().endswith(".json"):
return _write_json(filepath, data, kwargs)
elif filepath.lower().endswith(".jsonl"):
return _write_jsonl(filepath, data, kwargs)
elif filepath.lower().endswith(".pickle"):
return _write_pickle(filepath, data, kwargs)
elif filepath.lower().endswith(".yml") or filepath.lower().endswith(".yaml"):
raise NotImplementedError(
"YAML is not supported, because you need "
"PyYAML in Python3. "
"See "
"https://stackoverflow.com/a/42054860/562769"
" as a guide how to use it."
)
elif filepath.lower().endswith(".h5") or filepath.lower().endswith(".hdf5"):
raise NotImplementedError(
"HDF5 is not supported. See "
"https://stackoverflow.com/a/41586571/562769"
" as a guide how to use it."
)
else:
raise NotImplementedError(
f"File '{filepath}' does not end in one of the "
f"supported formats. Supported are: {supported_formats}"
)
def _write_csv(filepath: str, data: Any, kwargs: Dict) -> Any:
"""See documentation of mpu.io.write."""
newline = None
if "newline" in kwargs:
newline = kwargs["newline"]
del kwargs["newline"]
with open(filepath, "w", encoding="utf8", newline=newline) as fp:
if "delimiter" not in kwargs:
kwargs["delimiter"] = ","
if "quotechar" not in kwargs:
kwargs["quotechar"] = '"'
writer = csv.writer(fp, **kwargs)
writer.writerows(data)
return data
def _write_json(filepath: str, data: Any, kwargs: Dict) -> Any:
"""See documentation of mpu.io.write."""
with open(filepath, "w", encoding="utf8") as outfile:
if "indent" not in kwargs:
kwargs["indent"] = 4
if "sort_keys" not in kwargs:
kwargs["sort_keys"] = True
if "separators" not in kwargs:
kwargs["separators"] = (",", ": ")
if "ensure_ascii" not in kwargs:
kwargs["ensure_ascii"] = False
str_ = json.dumps(data, **kwargs)
outfile.write(str_)
return data
def _write_jsonl(filepath: str, data: Any, kwargs: Dict) -> Any:
"""See documentation of mpu.io.write."""
with open(filepath, "w", encoding="utf8") as outfile:
kwargs["indent"] = None # JSON has to be on one line!
if "sort_keys" not in kwargs:
kwargs["sort_keys"] = True
if "separators" not in kwargs:
kwargs["separators"] = (",", ": ")
if "ensure_ascii" not in kwargs:
kwargs["ensure_ascii"] = False
for line in data:
str_ = json.dumps(line, **kwargs)
outfile.write(str_)
outfile.write("\n")
return data
def _write_pickle(filepath: str, data: Any, kwargs: Dict) -> Any:
"""See documentation of mpu.io.write."""
if "protocol" not in kwargs:
kwargs["protocol"] = pickle.HIGHEST_PROTOCOL
with open(filepath, "wb") as handle:
pickle.dump(data, handle, **kwargs)
return data
[docs]def urlread(url: str, encoding: str = "utf8") -> str:
"""
Read the content of an URL.
Parameters
----------
url : str
encoding : str (default: "utf8")
Returns
-------
content : str
"""
# Core Library
from urllib.request import urlopen
response = urlopen(url)
content = response.read()
content = content.decode(encoding)
return content
[docs]def download(source: str, sink: Optional[str] = None) -> str:
"""
Download a file.
Parameters
----------
source : str
Where the file comes from. Some URL.
sink : str, optional (default: same filename in current directory)
Where the file gets stored. Some filepath in the local file system.
"""
# Core Library
from urllib.request import urlretrieve
if sink is None:
sink = os.path.abspath(os.path.split(source)[1])
urlretrieve(source, sink)
return sink
[docs]def hash(
filepath: str, method: Literal["sha1", "md5"] = "sha1", buffer_size: int = 65536
) -> str:
"""
Calculate a hash of a local file.
Parameters
----------
filepath : str
method : {'sha1', 'md5'}
buffer_size : int, optional (default: 65536 byte = 64 KiB)
in byte
Returns
-------
hash : str
"""
if method == "sha1":
hash_function = hashlib.sha1()
elif method == "md5":
hash_function = hashlib.md5()
else:
raise NotImplementedError(
f"Only md5 and sha1 hashes are known, but '{method}' was specified."
)
with open(filepath, "rb") as fp:
while True:
data = fp.read(buffer_size)
if not data:
break
hash_function.update(data)
return hash_function.hexdigest()
[docs]def get_creation_datetime(filepath: str) -> Optional[datetime]:
"""
Get the date that a file was created.
Parameters
----------
filepath : str
Returns
-------
creation_datetime : Optional[datetime]
"""
if platform.system() == "Windows":
return datetime.fromtimestamp(os.path.getctime(filepath))
else:
stat = os.stat(filepath)
try:
return datetime.fromtimestamp(stat.st_birthtime)
except AttributeError:
# We're probably on Linux. No easy way to get creation dates here,
# so we'll settle for when its content was last modified.
return None
[docs]def get_modification_datetime(filepath: str) -> datetime:
"""
Get the datetime that a file was last modified.
Parameters
----------
filepath : str
Returns
-------
modification_datetime : datetime
"""
# Third party
import tzlocal
timezone = tzlocal.get_localzone()
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
return mtime.replace(tzinfo=timezone)
[docs]def get_access_datetime(filepath: str) -> datetime:
"""
Get the last time filepath was accessed.
Parameters
----------
filepath : str
Returns
-------
access_datetime : datetime
"""
# Third party
import tzlocal
tz = tzlocal.get_localzone()
mtime = datetime.fromtimestamp(os.path.getatime(filepath))
return mtime.replace(tzinfo=tz)
[docs]def gzip_file(source: str, sink: str) -> None:
"""
Create a GZIP file from a source file.
Parameters
----------
source : str
Filepath
sink : str
Filepath
"""
# Core Library
import gzip
with open(source, "rb") as f_in, gzip.open(sink, "wb") as f_out:
f_out.writelines(f_in)