Source code for pspman.define

#!/usr/bin/env python3
# -*- coding:utf-8; mode:python -*-
#
# Copyright 2020 Pradyumna Paranjape
# This file is part of pspman.
#
# pspman is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pspman is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with pspman.  If not, see <https://www.gnu.org/licenses/>.
#
'''
Define variables from command line and defaults

'''


import os
import sys
import typing
import subprocess
from pathlib import Path
import argparse
import shutil
import argcomplete
from psprint import print
from . import CONFIG
from .config import MetaConfig
from .classes import InstallEnv
from .tools import timeout


[docs]def cli(config: MetaConfig = None) -> argparse.ArgumentParser: ''' Parse command line arguments Args: config: configuration to be modified by command line inputs Returns: modified ``confing`` ''' config = config or CONFIG description = ''' \033[1;91mNOTICE: This is only intended for "user" packages. CAUTION: DO NOT RUN THIS SCRIPT AS ROOT. CAUTION: If you still insist, I won't care.\033[m ''' d_pref = config.data_dir parser = argparse.ArgumentParser( description=description, formatter_class=argparse.RawTextHelpFormatter ) # base arguments parser.add_argument('--init', action='store_true', help='Initialize PSPMan') parser.add_argument('--version', action='store_true', help='Display version and exit') parser.add_argument('-l', '--list', action='store_true', dest='info', help='display list of cloned repositories and exit') parser.add_argument('-v', '--verbose', action='store_true', help='display verbose output') parser.add_argument('-s', '--stale', action='store_true', help='skip updates, let repositories remain stale') parser.add_argument('-o', '--only-pull', action='store_true', dest='pull', help='only pull, do not try to install') parser.add_argument('-f', '--force-risk', action='store_true', dest='risk', help='force working with root permissions [DANGEROUS]') parser.add_argument('-p', '--prefix', type=str, nargs='?', metavar='PREF', help=f'path for installation [default: {d_pref}]', default=d_pref) parser.add_argument('-c', '--clone-dir', type=str, nargs='?', default=None, metavar='C_DIR', help=f'''Clone git repos in C_DIR. Please check if you want to add this to PATH. [default: PREF{os.sep}src] ''') parser.add_argument('-r', '--reset', metavar='PROJ', type=str, nargs='*', default=[], help='clean-reset PROJ code') parser.add_argument('-d', '--delete', metavar='PROJ', type=str, nargs='*', default=[], help='delete PROJ') parser.add_argument('-i', '--install', metavar='URL', type=str, nargs='*', default=[], help=f''' format: "URL[___branch[___'only'|___inst_argv[___sh_env]]]" * *REMEMBER the QUOTATION MARKS* * URL: url to be cloned. * branch: custom branch to clone. Blank implies default. * pull_only: 'true', 'only', 'pull', 'hold' => Don't try to install this URL * inst_argv: Custom arguments. These are passed *raw* during installation. * sh_env: VAR1=VAL1,VAR2=VAL2,VAR3=VAL3.... Modified install environment. ''') parser.set_defaults(call_function=None) # sub-commands sub_parsers = parser.add_subparsers() version = sub_parsers.add_parser(name='version', aliases=['ver'], help='display version and exit') version.set_defaults(call_function='version') switch = sub_parsers.add_parser( name='switch', aliases=['activate', 'export'], help='switch to environment temporarily\n' + 'with additional *PATH variables from PREFIX') switch.add_argument('switch_to', type=str, metavar='GIT_GROUP|PATH', help="GIT_GROUP's name or path", nargs='?', default='default') switch.add_argument('-c', '--copy', action='store_true', dest='clipboard', help='try to copy soruce command to clipboard') switch.set_defaults(call_function='switch') unlock = sub_parsers.add_parser(name='unlock', aliases=[], help='Unlock C_DIR and exit') unlock.set_defaults(call_function='unlock') list_gits = sub_parsers.add_parser( name='list', aliases=['info'], help='display list of cloned repositories and exit' ) list_gits.add_argument('--meta', '-m', action='store_true', help='List known C_DIR(s)') list_gits.set_defaults(call_function='info') init = sub_parsers.add_parser(name='init', aliases=['initialize'], help='initialize pspman') init.add_argument('--ignore', '-i', type=str, metavar='DEP', nargs='*', help='initialize without dependency DEP') init.set_defaults(call_function='init') goodbye = sub_parsers.add_parser(name='goodbye', aliases=['de-initialize'], help='Cleanup before uninstalling pspman') goodbye.set_defaults(call_function='goodbye') return parser
[docs]def cli_opts(config: MetaConfig = None) -> typing.Dict[str, typing.Any]: ''' Parse cli arguments to return its dict ''' config = config or CONFIG parser = cli() argcomplete.autocomplete(parser) args = parser.parse_args() if args.info: setattr(args, 'call_function', 'info') if hasattr(args, 'meta'): if args.meta: setattr(args, 'call_function', 'meta') else: setattr(args, 'call_function', 'info') if args.version: setattr(args, 'call_function', 'version') if args.init: setattr(args, 'call_function', 'init') return vars(args)
[docs]def perm_pass(env: InstallEnv, permdir: Path) -> int: ''' Args: permdir: directory whose permissions are to be checked Returns: Error code: ``1`` if all rwx permissions are not granted ''' if env.verbose: print(f'Checking permissions for {permdir}') while not permdir.exists(): # clone/prefix directory get be created anew permdir = permdir.resolve().parent if env.verbose: print(f'Checking permissions for the parent: {permdir}') user = os.environ.get('USER', 'root') stdout, err = subprocess.Popen(['stat', '-L', '-c', "%U %G %a", permdir], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() if err: print('Error checking directory permissions, aborting...', mark=5) return 1 owner, group, octperm = stdout.replace("\n", '').split(' ') if (octperm[-1] == '7') != 0: # everyone has access return 0 if (octperm[-2] == '7') != 0: # some group has permissions stdout, err = subprocess.Popen(['groups', user], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() if err: # error print('Error checking group permissions, aborting...', mark=5) return 1 user_groups = stdout.split(' ') for u_grp in user_groups: if u_grp == group: return 0 if (octperm[-3] == '7') != 0: # owner has permissions if user == owner: return 0 print(f''' We [{user}] do not have sufficient permissions [{octperm}] on {owner}'s directory: {permdir} ''', mark=5) print('Try another location', mark=2) return 1
[docs]def prepare_env(env: InstallEnv) -> int: ''' Check permissions and create prefix and source directories Returns: Error code ''' # Am I root? if os.environ.get('USER', 'root').lower() == 'root': print('I hate dictators', mark=3) if not env.risk: print('Bye', mark=0) return 2 print('I can only hope you know what you are doing...', mark=3) print('Here is a chance to kill me in', mark=2) try: timeout(10) except: print("Aborting.", pref_color='g', pref=chr(0x1f197), short=False) return 1 print() print("Your decision", pref=chr(0x1f937), pref_color='r', text_color="y", short=False) print() print('Proceeding...', mark=1) else: # Is installation directory read/writable err = perm_pass(env=env, permdir=env.clone_dir) err += perm_pass(env=env, permdir=env.prefix) if err != 0: print('Bye', mark=0) return err env.clone_dir.mkdir(parents=True, exist_ok=True) env.prefix.mkdir(parents=True, exist_ok=True) return 0
[docs]def lock(env: InstallEnv, unlock: bool = False, message: str = None): ''' Unlock up the directory Args: env: installation context unlock: unlock existing locks? message: message to be written in the lockfile instead of pid Returns: Error code ''' lock_path = env.prefix.joinpath('.proc.lock') # lockfile is deliberately human-readable if lock_path.exists(): # directory is locked if unlock: # restore all backup databases for filetype in "healthy", "fail": backup_file = env.clone_dir.joinpath(f".pspman.{filetype}.yml") if backup_file.with_suffix('.yml.bak').is_file() and \ not backup_file.is_file(): backup_file.with_suffix(".yml.bak").replace(backup_file) temp_build = env.prefix.joinpath('temp_build') if temp_build.is_dir(): shutil.rmtree(temp_build) lock_path.unlink() return 1 with open(lock_path, 'r') as lock_fh: print(f"This git-group was locked for safety by {lock_fh.read()}", mark='err') print("Either wait for the process to get completed") print("OR interrupt the process and execute") print(f"pspman -p {env.prefix} unlock", mark='act') print("Interruption WILL generally MESS UP source codes.", mark='warn') return 2 if unlock: print(f'Lockfile {lock_path} not found.') return 2 with open(lock_path, 'w') as lock_fh: lock_fh.write(str(message) or 'pid:' + str(os.getpid())) return 0