Source code for pspman.queues

#!/usr/bin/self.env python3
# -*- coding:utf-8; mode:python -*-
#
# Copyright 2020 Pradyumna Paranjape
# This file is part of pspman.
#
# pspman is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pspman is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with pspman.  If not, see <https://www.gnu.org/licenses/>.
#
'''
Command Queues

'''


import os
import sys
import typing
import multiprocessing
import time
import string
import socket
import random
import tempfile
from pathlib import Path
import json
import yaml
from . import print
from .classes import InstallEnv, GitProject, GitProjEncoder
from .fork_actions import delete, clone, update, install, success, failure
from .errors import ClosedQueueError
from .tag import TAG_ACTION, RET_CODE


[docs]class PSPQueue: ''' Base FIBO Queue object to push and retrieve tasks. File In Batch Out. Attributes: env: installation context queue: queued ``GitProjects`` downstream_qs: upstream queues feeding to this queue * success: successful projects are pushed here * fail: failed projects are pushed here upstream_qs: upstream queues feeding to this queue q_type: type of queue pid: pid of child process closed: Is the queue closed? (set by function ``done``) Args: env: installation context action: procedure to perform on each ``GitProject`` in the queue items: success_q: push to this queue, if action succeeds fail_q: push to this queue, if action fails **kwargs: * items: Optional[Dict[str, GitProject]]: initialize with items * q_type: type of queue ''' def __init__(self, env: InstallEnv, action: typing.Callable, fail_q: 'PSPQueue' = None, # type: ignore **kwargs): self.env = env self._parallel = len(os.sched_getaffinity(0)) self.upstream_qs: typing.List['PSPQueue'] = [] # type: ignore self.downstream_qs = {'success': kwargs.get('success'), 'fail': kwargs.get('fail')} for ds_q in self.downstream_qs.values(): if ds_q is not None: ds_q.upstream_qs.append(self) self.action = action if kwargs.get('items') is None: self.queue: typing.Dict[str, GitProject] = {} else: self.queue = kwargs['items'].copy() self.q_type: str = kwargs.get('q_type', 'base') self._server, self._client = self._create_sockets() self.closed = False self.pid = self.start() def _create_sockets(self) -> typing.Tuple[socket.socket, socket.socket]: ''' Parent: Create sockets ''' name = ''.join(random.choices( string.ascii_uppercase + string.digits, k=10 )) name = os.path.join(tempfile.gettempdir(), name + f"_{self.q_type}_pspman.sock") # Child serves, parent process is the client server = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) server.bind(name) server.listen() client = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) client.connect(name) return server, client
[docs] def add(self, project: GitProject = None) -> None: ''' Parent: Add project to queue at the end Args: project: project to queue ''' if not self._client._closed: # type: ignore # send this to child self.copy_to_server(project) else: raise ClosedQueueError(self)
def __len__(self) -> int: ''' Parent/child: length of ojects in the queue ''' return len(self.queue)
[docs] def run_batch(self) -> None: ''' Child: Execute threads that run ``action`` on all items in the queue ''' if not len(self): return while len(self): n_wrkrs = min(self._parallel, len(self)) if self.env.verbose: print(f'Spawned {n_wrkrs} {self.q_type} worker(s)', mark='act') print(f'For projects:', mark='act') for p_name in self.queue: print(p_name, mark='list') with multiprocessing.Pool(n_wrkrs) as pool: results: typing.List[typing.Tuple[str, int, int]] = list( pool.map_async(self.action, ((self.env, project) for project in self.queue.values())).get()) for res in results: project = self.queue[res[0]] del self.queue[res[0]] project.tag = res[-2] if res[-1] == RET_CODE['pass']: self.on_success(project) elif res[-1] == RET_CODE['fail']: self.on_failure(project) if self.env.verbose: print(f"Processed {n_wrkrs} {self.q_type} actions", mark=2)
[docs] def on_success(self, project: GitProject): ''' run on success ''' if self.downstream_qs['success'] is not None: self.downstream_qs['success'].add(project)
[docs] def on_failure(self, project: GitProject): ''' run on failure ''' if self.downstream_qs['fail'] is not None: self.downstream_qs['fail'].add(project)
[docs] def done(self, caller: 'PSPQueue' = None) -> None: # type: ignore ''' Parent: No more items will be added. Finish current list and exit ''' if caller is not None: self.upstream_qs.pop(self.upstream_qs.index(caller)) if len(self.upstream_qs) != 0: return if self.env.verbose: print(f"Closing Queue {self.q_type}", mark='bug') self._client.send((0).to_bytes(length=64, byteorder='big')) self.closed = True
[docs] def start(self) -> int: ''' Parent: Start a batch run Returns: child: 0 parent: >0 ''' pid = os.fork() if pid == 0: # child server pipe, _ = self._server.accept() while not self._server._closed: # type: ignore while len(self) < self._parallel: close = self.copy_from_client(pipe) if close and not self._server._closed: # type: ignore self._server.close() break self.run_batch() else: if self.downstream_qs['success'] is not None: self.downstream_qs['success'].done(self) if self.downstream_qs['fail'] is not None: self.downstream_qs['fail'].done(self) sys.exit(0) else: # parent client pass return pid
[docs] def copy_from_client(self, pipe: socket.socket) -> bool: ''' Child: copy parent's queue Returns: ``True`` if 'close' instruction is received ''' size_in_bytes = pipe.recv(64) chunk = int.from_bytes(bytes=size_in_bytes, byteorder='big') if chunk == 0: # input closed return True for name, data in json.loads( pipe.recv(chunk).decode('utf-8')).items(): self.queue[name] = GitProject(data=data) return False
[docs] def copy_to_server(self, project: GitProject = None): ''' Parent: copy to child's queue ''' if project is None: return self.queue[project.name] = project json_t = json.dumps(self.queue, cls=GitProjEncoder).encode('utf-8') self._client.send(len(json_t).to_bytes(length=64, byteorder='big')) self._client.send(json_t) self.queue = {}
def __repr__(self) ->str: ''' Representation of queue ''' represent: typing.List[str] = [f'Queue: {self.q_type}'] represent.append(f"On Success: {self.downstream_qs['success'].q_type}" if self.downstream_qs['success'] is not None else 'On Success: ``None``') represent.append(f"On fail: {self.downstream_qs['fail'].q_type}" if self.downstream_qs['fail'] is not None else 'On fail: ``None``') represent.append("Upstream feeds:") for up_q in self.upstream_qs: represent.append("\t" + f"{up_q.q_type}") represent.append(f'contents: {len(self)} items') return '\n'.join(represent)
[docs]class TermQueue(PSPQueue): ''' Terminal queues that do not have a downstream action queue ''' def __init__(self, env: InstallEnv, action: typing.Callable, q_type: str = 'terminal', **kwargs): super().__init__(env=env, action=action, q_type=q_type, **kwargs)
[docs] def on_success(self, project: GitProject): ''' Child: store GitProject state ''' with open(self.env.clone_dir.joinpath('.pspman.healthy.yml'), 'a') as db_handle: yaml.dump({project.name: project.__dict__}, db_handle) with open(self.env.clone_dir.joinpath('.pspman.fail.yml'), 'a') as fail_handle: yaml.dump({project.name: None}, fail_handle)
[docs] def on_failure(self, project: GitProject): ''' Child: store GitProject state ''' with open(self.env.clone_dir.joinpath('.pspman.fail.yml'), 'a') as db_handle: yaml.dump({project.name: project.__dict__}, db_handle)
[docs]class SuccessQueue(TermQueue): ''' Queue to reguster Successful objects ''' def __init__(self, env: InstallEnv, **kwargs): super().__init__(env=env, action=success, q_type='success', **kwargs)
[docs]class FailQueue(TermQueue): ''' Queue to reguster Successful objects ''' def __init__(self, env: InstallEnv, **kwargs): super().__init__(env=env, action=failure, q_type='fail', **kwargs)
[docs]class DeleteQueue(TermQueue): ''' Queue for projects to delete ''' def __init__(self, env: InstallEnv, success: PSPQueue, fail: PSPQueue, **kwargs): super().__init__(env=env, action=delete, q_type='delete', success=success, fail=fail, **kwargs)
[docs] def on_success(self, project: GitProject): ''' run on success ''' with open(self.env.clone_dir.joinpath('.pspman.healthy.yml'), 'a') as db_handle: yaml.dump({project.name: None}, db_handle) with open(self.env.clone_dir.joinpath('.pspman.fail.yml'), 'a') as db_handle: yaml.dump({project.name: None}, db_handle) if self.downstream_qs['success'] is not None: self.downstream_qs['success'].add(None)
[docs]class InstallQueue(PSPQueue): ''' Queue of projects to install ''' def __init__(self, env: InstallEnv, success: PSPQueue, fail: PSPQueue, **kwargs): super().__init__(env=env, action=install, q_type='install', success=success, fail=fail, **kwargs)
[docs]class PullQueue(PSPQueue): ''' Queue of source codes to pull ''' def __init__(self, env: InstallEnv, success: PSPQueue, fail: PSPQueue, **kwargs): super().__init__(env=env, action=update, q_type='pull', success=success, fail=fail, **kwargs)
[docs] def on_success(self, project: GitProject): ''' run on success ''' project.mark_update_time() if self.downstream_qs['success'] is not None: self.downstream_qs['success'].add(project)
[docs]class CloneQueue(PSPQueue): ''' Queue of projects to clone ''' def __init__(self, env: InstallEnv, success: PSPQueue, fail: PSPQueue, **kwargs): super().__init__(env=env, action=clone, q_type='clone', success=success, fail=fail, **kwargs)
[docs] def on_success(self, project: GitProject): ''' run on success ''' project.mark_update_time() if self.downstream_qs['success'] is not None: self.downstream_qs['success'].add(project)