Source code for mycroft.util.file_utils

# Copyright 2020 Mycroft AI Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Mycroft file utils.

This module contains functions handling mycroft resource files and things like
accessing and curating mycroft's cache.
"""

import os
import psutil
from stat import S_ISREG, ST_MTIME, ST_MODE, ST_SIZE
import tempfile
import xdg.BaseDirectory

import mycroft.configuration
from .log import LOG


[docs]def resolve_resource_file(res_name): """Convert a resource into an absolute filename. Resource names are in the form: 'filename.ext' or 'path/filename.ext' The system wil look for $XDG_DATA_DIRS/mycroft/res_name first (defaults to ~/.local/share/mycroft/res_name), and if not found will look at /opt/mycroft/res_name, then finally it will look for res_name in the 'mycroft/res' folder of the source code package. Example: With mycroft running as the user 'bob', if you called ``resolve_resource_file('snd/beep.wav')`` it would return either: '$XDG_DATA_DIRS/mycroft/beep.wav', '/home/bob/.mycroft/snd/beep.wav' or '/opt/mycroft/snd/beep.wav' or '.../mycroft/res/snd/beep.wav' where the '...' is replaced by the path where the package has been installed. Args: res_name (str): a resource path/name Returns: (str) path to resource or None if no resource found """ config = mycroft.configuration.Configuration.get() # First look for fully qualified file (e.g. a user setting) if os.path.isfile(res_name): return res_name # Now look for XDG_DATA_DIRS for conf_dir in xdg.BaseDirectory.load_data_paths('mycroft'): filename = os.path.join(conf_dir, res_name) if os.path.isfile(filename): return filename # Now look in the old user location filename = os.path.join(os.path.expanduser('~'), '.mycroft', res_name) if os.path.isfile(filename): return filename # Next look for /opt/mycroft/res/res_name data_dir = os.path.join(os.path.expanduser(config['data_dir']), 'res') filename = os.path.expanduser(os.path.join(data_dir, res_name)) if os.path.isfile(filename): return filename # Finally look for it in the source package filename = os.path.join(os.path.dirname(__file__), '..', 'res', res_name) filename = os.path.abspath(os.path.normpath(filename)) if os.path.isfile(filename): return filename return None # Resource cannot be resolved
def read_stripped_lines(filename): """Read a file and return a list of stripped lines. Args: filename (str): path to file to read. Returns: (list) list of lines stripped from leading and ending white chars. """ with open(filename, 'r') as f: for line in f: line = line.strip() if line: yield line def read_dict(filename, div='='): """Read file into dict. A file containing: foo = bar baz = bog results in a dict { 'foo': 'bar', 'baz': 'bog' } Args: filename (str): path to file div (str): deviders between dict keys and values Returns: (dict) generated dictionary """ d = {} with open(filename, 'r') as f: for line in f: key, val = line.split(div) d[key.strip()] = val.strip() return d def mb_to_bytes(size): """Takes a size in MB and returns the number of bytes. Args: size(int/float): size in Mega Bytes Returns: (int/float) size in bytes """ return size * 1024 * 1024 def _get_cache_entries(directory): """Get information tuple for all regular files in directory. Args: directory (str): path to directory to check Returns: (tuple) (modification time, size, filepath) """ entries = (os.path.join(directory, fn) for fn in os.listdir(directory)) entries = ((os.stat(path), path) for path in entries) # leave only regular files, insert modification date return ((stat[ST_MTIME], stat[ST_SIZE], path) for stat, path in entries if S_ISREG(stat[ST_MODE])) def _delete_oldest(entries, bytes_needed): """Delete files with oldest modification date until space is freed. Args: entries (tuple): file + file stats tuple bytes_needed (int): disk space that needs to be freed Returns: (list) all removed paths """ deleted_files = [] space_freed = 0 for moddate, fsize, path in sorted(entries): try: os.remove(path) space_freed += fsize deleted_files.append(path) except Exception: pass if space_freed > bytes_needed: break # deleted enough! return deleted_files def curate_cache(directory, min_free_percent=5.0, min_free_disk=50): """Clear out the directory if needed. The curation will only occur if both the precentage and actual disk space is below the limit. This assumes all the files in the directory can be deleted as freely. Args: directory (str): directory path that holds cached files min_free_percent (float): percentage (0.0-100.0) of drive to keep free, default is 5% if not specified. min_free_disk (float): minimum allowed disk space in MB, default value is 50 MB if not specified. """ # Simpleminded implementation -- keep a certain percentage of the # disk available. # TODO: Would be easy to add more options, like whitelisted files, etc. deleted_files = [] space = psutil.disk_usage(directory) min_free_disk = mb_to_bytes(min_free_disk) percent_free = 100.0 - space.percent if percent_free < min_free_percent and space.free < min_free_disk: LOG.info('Low diskspace detected, cleaning cache') # calculate how many bytes we need to delete bytes_needed = (min_free_percent - percent_free) / 100.0 * space.total bytes_needed = int(bytes_needed + 1.0) # get all entries in the directory w/ stats entries = _get_cache_entries(directory) # delete as many as needed starting with the oldest deleted_files = _delete_oldest(entries, bytes_needed) return deleted_files
[docs]def get_cache_directory(domain=None): """Get a directory for caching data. This directory can be used to hold temporary caches of data to speed up performance. This directory will likely be part of a small RAM disk and may be cleared at any time. So code that uses these cached files must be able to fallback and regenerate the file. Args: domain (str): The cache domain. Basically just a subdirectory. Returns: (str) a path to the directory where you can cache data """ config = mycroft.configuration.Configuration.get() directory = config.get("cache_path") if not directory: # If not defined, use /tmp/mycroft/cache directory = get_temp_path('mycroft', 'cache') return ensure_directory_exists(directory, domain)
def ensure_directory_exists(directory, domain=None, permissions=0o777): """Create a directory and give access rights to all Args: directory (str): Root directory domain (str): Domain. Basically a subdirectory to prevent things like overlapping signal filenames. rights (int): Directory permissions (default is 0o777) Returns: (str) a path to the directory """ if domain: directory = os.path.join(directory, domain) # Expand and normalize the path directory = os.path.normpath(directory) directory = os.path.expanduser(directory) if not os.path.isdir(directory): try: save = os.umask(0) os.makedirs(directory, permissions) except OSError: LOG.warning("Failed to create: " + directory) finally: os.umask(save) return directory def create_file(filename): """Create the file filename and create any directories needed Args: filename: Path to the file to be created """ ensure_directory_exists(os.path.dirname(filename), permissions=0o775) with open(filename, 'w') as f: f.write('') def get_temp_path(*args): """Generate a valid path in the system temp directory. This method accepts one or more strings as arguments. The arguments are joined and returned as a complete path inside the systems temp directory. Importantly, this will not create any directories or files. Example usage: get_temp_path('mycroft', 'audio', 'example.wav') Will return the equivalent of: '/tmp/mycroft/audio/example.wav' Args: path_element (str): directories and/or filename Returns: (str) a valid path in the systems temp directory """ try: path = os.path.join(tempfile.gettempdir(), *args) except TypeError: raise TypeError("Could not create a temp path, get_temp_path() only " "accepts Strings") return path