Source code for utils

import os
import shutil
import importlib
import sys
import inspect
import shlex
import logging
import pip
import subprocess
import yaml
import aiohttp

import pyconfig

logger = logging.getLogger(__name__)
logger.setLevel(logging.NOTSET)



[docs]def is_module(path): """Checks if path is a module.""" fname, ext = os.path.splitext(path) if ext == ".py": return True try: # Python 3 allows modules not to have an __init__.py if any(os.path.splitext(x)[1] == ".py" for x in os.listdir(path)): return True except (FileNotFoundError, NotADirectoryError) as e: logger.debug(e, 'Skipping un-recognised file or directory in plug-in path') pass
[docs]def contains_setup_beard_py(path): """Checks if path contains setup_beard.py.""" return os.path.isfile(os.path.join(path, "setup_beard.py"))
[docs]class PythonPathContext: def __init__(self, path_to_add): self.path_to_add = path_to_add def __enter__(self): sys.path.insert(0, self.path_to_add) def __exit__(self, type, value, tb): assert sys.path[0] == self.path_to_add sys.path.pop(0)
[docs]def get_beard_config(config_file="../../config.yml"): """Attempts to load a yaml file in the beard directory. NOTE: The file location should be relative from where this function is called. """ # Sometimes external libraries change the logging level. This is not # acceptable, so we assert after importing a beard that it has not changed. logger_level_before = logger.getEffectiveLevel() logging.debug("logging level: {}".format(logger.getEffectiveLevel())) callers_frame = inspect.currentframe().f_back logger.debug("This function was called from the file: " + callers_frame.f_code.co_filename) base_path = os.path.dirname(callers_frame.f_code.co_filename) config = yaml.safe_load(open(os.path.join(base_path, config_file))) logging.debug("logging level: {}".format(logger.getEffectiveLevel())) logger_level_after = logger.getEffectiveLevel() assert logger_level_before == logger_level_after, \ "Something has changed the logger level!" return config
[docs]def setup_beard(beard_module_name, *, beard_python_path="python", beard_requirements_file="requirements.txt", config_file="config.yml", example_config_file="config.yml.example", copy_config=False): """Sets up a beard for use. Note: beard_python_path must be a path relative to the file setup_beard is called from. """ callers_frame = inspect.currentframe().f_back logger.debug("This function was called from the file: " + callers_frame.f_code.co_filename) base_path = os.path.dirname(callers_frame.f_code.co_filename) if copy_config: if not os.path.isfile(os.path.join(base_path, config_file)): logger.info("Attempting to copy config file.") shutil.copyfile( os.path.join(base_path, example_config_file), os.path.join(base_path, config_file), ) # Install requirements requirements_file = os.path.join(base_path, beard_requirements_file) if not pyconfig.get('no_auto_pip') and os.path.isfile(requirements_file): pip_args = [ 'pip', 'install', '-r', # A little sanitising # re.sub("[^a-z0-9./_-]", "", requirements_file) '"{}"'.format(requirements_file.replace("'", "")) ] if pyconfig.get('auto_pip_upgrade'): pip_args.append('--upgrade') # Using the library pip.main causes the logger level to change. subprocess.check_call(" ".join(pip_args), shell=True) # Invalidate import path cache, since it's probably changed if new # requirements have been installed importlib.invalidate_caches() # Import beard beard_python_path = os.path.join(base_path, beard_python_path) with PythonPathContext(beard_python_path): # Attempt to import the module named specified in the call to # setup_beard. # # Often, a module with the same name has already be imported, so the # module is reloaded to ensure that if a module is found in # beard_python_path called beard_module_name, *that* module is loaded. mod = importlib.import_module(beard_module_name) # If it's a namespace module, then it hasn't yet imported the new # module. Reload to get the new one. if is_namespace_module(mod): importlib.reload(mod)
[docs]def is_namespace_module(mod): """Check is the given module is a namespace module.""" return not hasattr(mod, '__file__')
[docs]def get_literal_path(path_or_autoloader): """Gets literal path from AutoLoader or returns input.""" try: return path_or_autoloader.path except AttributeError: assert type(path_or_autoloader) is str, "beard_path is not a str or an AutoLoader!" return path_or_autoloader
[docs]def get_literal_beard_paths(beard_paths): """Returns list of literal beard paths.""" return [get_literal_path(x) for x in beard_paths]
[docs]def all_possible_beards(paths): """List generator of all plug-ins that Skybeard has found and can be loaded""" literal_paths = get_literal_beard_paths(paths) for path in literal_paths: for f in (x for x in os.listdir(path) if not x.startswith(".")): if is_module(os.path.join(path, f)): yield os.path.basename(f)
[docs]def embolden(string): """wraps a string in bold tags""" return "<b>"+string+"</b>"
[docs]def italisize(string): """wraps a string in italic tags""" return "<i>"+string+"</i>"
[docs]def get_args(msg_or_text, return_string=False, **kwargs): """Helper function when the command used in the telegram chat may have arguments, e.g /command arg1 arg2. Returns a list of any arguments found after the command""" if "as_string" in kwargs: logger.warning( "as_string is being depreciated, please use return_string.") return_string = kwargs["as_string"] try: text = msg_or_text['text'] except TypeError: text = msg_or_text if return_string: return " ".join(text.split(" ")[1:]) else: return shlex.split(text)[1:]
[docs]def partition_text(text): """Generator for splitting long texts into ones below the character limit. Messages are split at the nearest line break and each successive chunk is yielded. Relatively untested""" if len(text) < 3500: yield text else: text_list = text.split('\n') l = 0 # length iterator of current block i = 0 # start position of block j = 0 # end position of block # j scans through list of lines from start position i l tracks length # of all characters in the current scan If length of everything from i # to j+1 > the limit, yield current block, joined into single string, # and shift the scanning position up to the start of the new block. for m in text_list: l += len(m) try: # if adding another line will breach the limit, # yield current block if l+len(text_list[j+1]) > 3500: indices = [i, j] yield '\n'.join( [msg for k, msg in enumerate(text_list) if k in indices]) # shift start position for the next block i = j+1 l = 0 j += 1 except IndexError: yield text_list[i]
BOT_JSON = None
[docs]async def getMe(): global BOT_JSON if not BOT_JSON: async with aiohttp.ClientSession() as session: async with session.get( "https://api.telegram.org/bot{}/getMe".format( pyconfig.get('key'))) as resp: BOT_JSON = (await resp.json())['result'] return BOT_JSON