Source code for xonsh.commands_cache

"""Module for caching command & alias names as well as for predicting whether
a command will be able to be run in the background.

A background predictor is a function that accepts a single argument list
and returns whether or not the process can be run in the background (returns
True) or must be run the foreground (returns False).
"""
import argparse
import collections.abc as cabc
import functools
import os
import pickle
import sys
import threading
import time
import typing as tp
from pathlib import Path

from xonsh.built_ins import XSH
from xonsh.lazyasd import lazyobject
from xonsh.platform import ON_POSIX, ON_WINDOWS, pathbasename
from xonsh.tools import executables_in


[docs]class CommandsCache(cabc.Mapping): """A lazy cache representing the commands available on the file system. The keys are the command names and the values a tuple of (loc, has_alias) where loc is either a str pointing to the executable on the file system or None (if no executable exists) and has_alias is a boolean flag for whether the command has an alias. """ CACHE_FILE = "commands-cache.pickle" def __init__(self): self._cmds_cache = {} self._path_checksum = None self._alias_checksum = None self._path_mtime = -1 self.threadable_predictors = default_threadable_predictors() self._loaded_pickled = False # force it to load from env by setting it to None self._cache_file = None @property def cache_file(self): """Keeping a property that lies on instance-attribute""" env = XSH.env or {} # Path to the cache-file where all commands/aliases are cached for pre-loading if self._cache_file is None: if "XONSH_DATA_DIR" in env and env.get("COMMANDS_CACHE_SAVE_INTERMEDIATE"): self._cache_file = ( Path(env["XONSH_DATA_DIR"]).joinpath(self.CACHE_FILE).resolve() ) else: # set a falsy value other than None self._cache_file = "" return self._cache_file def __contains__(self, key): self.update_cache() return self.lazyin(key) def __iter__(self): for cmd, _ in self.iter_commands(): yield cmd
[docs] def iter_commands(self): """Wrapper for handling windows path behaviour""" for cmd, (path, is_alias) in self.all_commands.items(): if ON_WINDOWS and path is not None: # All command keys are stored in uppercase on Windows. # This ensures the original command name is returned. cmd = pathbasename(path) yield cmd, (path, is_alias)
def __len__(self): return len(self.all_commands) def __getitem__(self, key) -> "tuple[str, bool]": self.update_cache() return self.lazyget(key)
[docs] def is_empty(self): """Returns whether the cache is populated or not.""" return len(self._cmds_cache) == 0
[docs] @staticmethod def get_possible_names(name): """Generates the possible `PATHEXT` extension variants of a given executable name on Windows as a list, conserving the ordering in `PATHEXT`. Returns a list as `name` being the only item in it on other platforms.""" if ON_WINDOWS: pathext = XSH.env.get("PATHEXT", []) name = name.upper() return [name + ext for ext in ([""] + pathext)] else: return [name]
[docs] @staticmethod def remove_dups(paths): cont = set() for p in map(os.path.realpath, paths): if p not in cont: cont.add(p) if os.path.isdir(p): yield p
def _check_changes(self, paths: tp.Tuple[str, ...], aliases): # did PATH change? path_hash = hash(paths) yield path_hash == self._path_checksum self._path_checksum = path_hash # did aliases change? al_hash = hash(frozenset(aliases)) yield al_hash == self._alias_checksum self._alias_checksum = al_hash # did the contents of any directory in PATH change? max_mtime = max(map(lambda path: os.stat(path).st_mtime, paths), default=0) yield max_mtime <= self._path_mtime self._path_mtime = max_mtime @property def all_commands(self): self.update_cache() return self._cmds_cache
[docs] def update_cache(self): env = XSH.env or {} paths = tuple(CommandsCache.remove_dups(env.get("PATH") or [])) # in case it is empty or unset alss = {} if XSH.aliases is None else XSH.aliases ( no_new_paths, no_new_alias, no_new_bins, ) = tuple(self._check_changes(paths, alss)) if no_new_paths and no_new_bins: if not no_new_alias: # only aliases have changed for cmd, alias in alss.items(): key = cmd.upper() if ON_WINDOWS else cmd if key in self._cmds_cache: self._cmds_cache[key] = (self._cmds_cache[key][0], alias) else: self._cmds_cache[key] = (cmd, True) # save the changes to cache as well self.set_cmds_cache(self._cmds_cache) return self._cmds_cache if self.cache_file and self.cache_file.exists(): # pickle the result only if XONSH_DATA_DIR is set if not self._loaded_pickled: # first time load the commands from cache-file self._cmds_cache = self.get_cached_commands() self._loaded_pickled = True # also start a thread that updates the cache in the bg worker = threading.Thread( target=self._update_cmds_cache, args=[paths, alss], daemon=True, ) worker.start() else: self._update_cmds_cache(paths, alss) return self._cmds_cache
@staticmethod @functools.lru_cache(maxsize=10) def _get_all_cmds(paths: tp.Sequence[str]): """Cache results when possible This will be helpful especially during tests where the PATH will be the same mostly. """ def _getter(): for path in reversed(paths): # iterate backwards so that entries at the front of PATH overwrite # entries at the back. for cmd in executables_in(path): yield cmd, os.path.join(path, cmd) return dict(_getter()) def _update_cmds_cache( self, paths: tp.Sequence[str], aliases: tp.Dict[str, str] ) -> tp.Dict[str, tp.Any]: """Update the cmds_cache variable in background without slowing down parseLexer""" env = XSH.env or {} # type: ignore allcmds = {} for cmd, path in self._get_all_cmds(paths).items(): key = cmd.upper() if ON_WINDOWS else cmd allcmds[key] = (path, aliases.get(key, None)) warn_cnt = env.get("COMMANDS_CACHE_SIZE_WARNING") if warn_cnt and len(allcmds) > warn_cnt: print( f"Warning! Found {len(allcmds):,} executable files in the PATH directories!", file=sys.stderr, ) for cmd in aliases: if cmd not in allcmds: key = cmd.upper() if ON_WINDOWS else cmd allcmds[key] = (cmd, True) # type: ignore return self.set_cmds_cache(allcmds)
[docs] def get_cached_commands(self) -> tp.Dict[str, str]: if self.cache_file and self.cache_file.exists(): try: return pickle.loads(self.cache_file.read_bytes()) or {} except Exception: # the file is corrupt self.cache_file.unlink(missing_ok=True) return {}
[docs] def set_cmds_cache(self, allcmds: tp.Dict[str, tp.Any]) -> tp.Dict[str, tp.Any]: """write cmds to cache-file and instance-attribute""" if self.cache_file: self.cache_file.write_bytes(pickle.dumps(allcmds)) self._cmds_cache = allcmds return allcmds
[docs] def cached_name(self, name): """Returns the name that would appear in the cache, if it exists.""" if name is None: return None cached = pathbasename(name) if ON_WINDOWS: keys = self.get_possible_names(cached) cached = next((k for k in keys if k in self._cmds_cache), None) return cached
[docs] def lazyin(self, key): """Checks if the value is in the current cache without the potential to update the cache. It just says whether the value is known *now*. This may not reflect precisely what is on the $PATH. """ return self.cached_name(key) in self._cmds_cache
[docs] def lazyiter(self): """Returns an iterator over the current cache contents without the potential to update the cache. This may not reflect what is on the $PATH. """ return iter(self._cmds_cache)
[docs] def lazylen(self): """Returns the length of the current cache contents without the potential to update the cache. This may not reflect precisely what is on the $PATH. """ return len(self._cmds_cache)
[docs] def lazyget(self, key, default=None): """A lazy value getter.""" return self._cmds_cache.get(self.cached_name(key), default)
[docs] def locate_binary(self, name, ignore_alias=False): """Locates an executable on the file system using the cache. Parameters ---------- name : str name of binary to search for ignore_alias : bool, optional Force return of binary path even if alias of ``name`` exists (default ``False``) """ # make sure the cache is up to date by accessing the property self.update_cache() return self.lazy_locate_binary(name, ignore_alias)
[docs] def lazy_locate_binary(self, name, ignore_alias=False): """Locates an executable in the cache, without checking its validity. Parameters ---------- name : str name of binary to search for ignore_alias : bool, optional Force return of binary path even if alias of ``name`` exists (default ``False``) """ possibilities = self.get_possible_names(name) if ON_WINDOWS: # Windows users expect to be able to execute files in the same # directory without `./` local_bin = next((fn for fn in possibilities if os.path.isfile(fn)), None) if local_bin: return os.path.abspath(local_bin) cached = next((cmd for cmd in possibilities if cmd in self._cmds_cache), None) if cached: (path, alias) = self._cmds_cache[cached] ispure = path == pathbasename(path) if alias and ignore_alias and ispure: # pure alias, which we are ignoring return None else: return path elif os.path.isfile(name) and name != pathbasename(name): return name
[docs] def is_only_functional_alias(self, name): """Returns whether or not a command is only a functional alias, and has no underlying executable. For example, the "cd" command is only available as a functional alias. """ self.update_cache() return self.lazy_is_only_functional_alias(name)
[docs] def lazy_is_only_functional_alias(self, name): """Returns whether or not a command is only a functional alias, and has no underlying executable. For example, the "cd" command is only available as a functional alias. This search is performed lazily. """ val = self._cmds_cache.get(name, None) if val is None: return False return ( val == (name, True) and self.locate_binary(name, ignore_alias=True) is None )
[docs] def predict_threadable(self, cmd): """Predicts whether a command list is able to be run on a background thread, rather than the main thread. """ predictor = self.get_predictor_threadable(cmd[0]) return predictor(cmd[1:])
[docs] def get_predictor_threadable(self, cmd0): """Return the predictor whether a command list is able to be run on a background thread, rather than the main thread. """ name = self.cached_name(cmd0) predictors = self.threadable_predictors if ON_WINDOWS: # On all names (keys) are stored in upper case so instead # we get the original cmd or alias name path, _ = self.lazyget(name, (None, None)) if path is None: return predict_true else: name = pathbasename(path) if name not in predictors: pre, ext = os.path.splitext(name) if pre in predictors: predictors[name] = predictors[pre] if name not in predictors: predictors[name] = self.default_predictor(name, cmd0) predictor = predictors[name] return predictor
# # Background Predictors (as methods) #
[docs] def default_predictor(self, name, cmd0): """Default predictor, using predictor from original command if the command is an alias, elseif build a predictor based on binary analysis on POSIX, else return predict_true. """ # alias stuff if not os.path.isabs(cmd0) and os.sep not in cmd0: alss = getattr(XSH, "aliases", dict()) if cmd0 in alss: return self.default_predictor_alias(cmd0) # other default stuff if ON_POSIX: return self.default_predictor_readbin( name, cmd0, timeout=0.1, failure=predict_true ) else: return predict_true
[docs] def default_predictor_alias(self, cmd0): alias_recursion_limit = ( 10 # this limit is se to handle infinite loops in aliases definition ) first_args = [] # contains in reverse order args passed to the aliased command alss = getattr(XSH, "aliases", dict()) while cmd0 in alss: alias_name = alss[cmd0] if isinstance(alias_name, (str, bytes)) or not isinstance( alias_name, cabc.Sequence ): return predict_true for arg in alias_name[:0:-1]: first_args.insert(0, arg) if cmd0 == alias_name[0]: # it is a self-alias stop recursion immediatly return predict_true cmd0 = alias_name[0] alias_recursion_limit -= 1 if alias_recursion_limit == 0: return predict_true predictor_cmd0 = self.get_predictor_threadable(cmd0) return lambda cmd1: predictor_cmd0(first_args[::-1] + cmd1)
[docs] def default_predictor_readbin(self, name, cmd0, timeout, failure): """Make a default predictor by analyzing the content of the binary. Should only works on POSIX. Return failure if the analysis fails. """ fname = cmd0 if os.path.isabs(cmd0) else None fname = cmd0 if fname is None and os.sep in cmd0 else fname fname = self.lazy_locate_binary(name) if fname is None else fname if fname is None: return failure if not os.path.isfile(fname): return failure try: fd = os.open(fname, os.O_RDONLY | os.O_NONBLOCK) except Exception: return failure # opening error search_for = { (b"ncurses",): [False], (b"libgpm",): [False], (b"isatty", b"tcgetattr", b"tcsetattr"): [False, False, False], } tstart = time.time() block = b"" while time.time() < tstart + timeout: previous_block = block try: block = os.read(fd, 2048) except Exception: # should not occur, except e.g. if a file is deleted a a dir is # created with the same name between os.path.isfile and os.open os.close(fd) return failure if len(block) == 0: os.close(fd) return predict_true # no keys of search_for found analyzed_block = previous_block + block for k, v in search_for.items(): for i in range(len(k)): if v[i]: continue if k[i] in analyzed_block: v[i] = True if all(v): os.close(fd) return predict_false # use one key of search_for os.close(fd) return failure # timeout
# # Background Predictors #
[docs]def predict_true(args): """Always say the process is threadable.""" return True
[docs]def predict_false(args): """Never say the process is threadable.""" return False
@lazyobject def SHELL_PREDICTOR_PARSER(): p = argparse.ArgumentParser("shell", add_help=False) p.add_argument("-c", nargs="?", default=None) p.add_argument("filename", nargs="?", default=None) return p
[docs]def predict_shell(args): """Predict the backgroundability of the normal shell interface, which comes down to whether it is being run in subproc mode. """ ns, _ = SHELL_PREDICTOR_PARSER.parse_known_args(args) if ns.c is None and ns.filename is None: pred = False else: pred = True return pred
@lazyobject def HELP_VER_PREDICTOR_PARSER(): p = argparse.ArgumentParser("cmd", add_help=False) p.add_argument("-h", "--help", dest="help", nargs="?", action="store", default=None) p.add_argument( "-v", "-V", "--version", dest="version", nargs="?", action="store", default=None ) return p
[docs]def predict_help_ver(args): """Predict the backgroundability of commands that have help & version switches: -h, --help, -v, -V, --version. If either of these options is present, the command is assumed to print to stdout normally and is therefore threadable. Otherwise, the command is assumed to not be threadable. This is useful for commands, like top, that normally enter alternate mode but may not in certain circumstances. """ ns, _ = HELP_VER_PREDICTOR_PARSER.parse_known_args(args) pred = ns.help is not None or ns.version is not None return pred
@lazyobject def HG_PREDICTOR_PARSER(): p = argparse.ArgumentParser("hg", add_help=False) p.add_argument("command") p.add_argument( "-i", "--interactive", action="store_true", default=False, dest="interactive" ) return p
[docs]def predict_hg(args): """Predict if mercurial is about to be run in interactive mode. If it is interactive, predict False. If it isn't, predict True. Also predict False for certain commands, such as split. """ ns, _ = HG_PREDICTOR_PARSER.parse_known_args(args) if ns.command == "split": return False else: return not ns.interactive
[docs]def predict_env(args): """Predict if env is launching a threadable command or not. The launched command is extracted from env args, and the predictor of lauched command is used.""" for i in range(len(args)): if args[i] and args[i][0] != "-" and "=" not in args[i]: # args[i] is the command and the following is its arguments # so args[i:] is used to predict if the command is threadable return XSH.commands_cache.predict_threadable(args[i:]) return True
[docs]def default_threadable_predictors(): """Generates a new defaultdict for known threadable predictors. The default is to predict true. """ # alphabetical, for what it is worth. predictors = { "asciinema": predict_help_ver, "aurman": predict_false, "awk": predict_true, "bash": predict_shell, "cat": predict_false, "clear": predict_false, "cls": predict_false, "cmd": predict_shell, "cryptop": predict_false, "cryptsetup": predict_true, "csh": predict_shell, "curl": predict_true, "elvish": predict_shell, "emacsclient": predict_false, "env": predict_env, "ex": predict_false, "fish": predict_shell, "gawk": predict_true, "ghci": predict_help_ver, "git": predict_true, "gvim": predict_help_ver, "hg": predict_hg, "htop": predict_help_ver, "ipython": predict_shell, "julia": predict_shell, "ksh": predict_shell, "less": predict_help_ver, "ls": predict_true, "man": predict_help_ver, "mc": predict_false, "more": predict_help_ver, "mutt": predict_help_ver, "mvim": predict_help_ver, "nano": predict_help_ver, "nmcli": predict_true, "nvim": predict_false, "percol": predict_false, "ponysay": predict_help_ver, "psql": predict_false, "push": predict_shell, "pv": predict_false, "python": predict_shell, "python2": predict_shell, "python3": predict_shell, "ranger": predict_help_ver, "repo": predict_help_ver, "rview": predict_false, "rvim": predict_false, "rwt": predict_shell, "scp": predict_false, "sh": predict_shell, "ssh": predict_false, "startx": predict_false, "sudo": predict_help_ver, "sudoedit": predict_help_ver, "systemctl": predict_true, "tcsh": predict_shell, "telnet": predict_false, "top": predict_help_ver, "tput": predict_false, "udisksctl": predict_true, "unzip": predict_true, "vi": predict_false, "view": predict_false, "vim": predict_false, "vimpager": predict_help_ver, "weechat": predict_help_ver, "wget": predict_true, "xclip": predict_help_ver, "xdg-open": predict_false, "xo": predict_help_ver, "xon.sh": predict_shell, "xonsh": predict_shell, "yes": predict_false, "zip": predict_true, "zipinfo": predict_true, "zsh": predict_shell, } return predictors