Source code for hpycc.connection

"""
Object for connecting to a HPCC instance.

This module provides a `Connection` class to connect to a HPCC
instance. This connection is used as the first input to the majority
of public functions in the `hpycc` package.

Classes
-------
- `Connection` -- HPCC connection class.

"""
__all__ = ["Connection"]

import collections
import os
import random
import requests
from requests.exceptions import HTTPError, RetryError
import subprocess
from tempfile import TemporaryDirectory
from time import sleep
from warnings import warn
from urllib import parse
from json import JSONDecodeError
from simplejson.errors import JSONDecodeError as simpleJSONDecodeError
from math import ceil

from hpycc.utils.parsers import parse_wuid_from_xml
from hpycc import delete


def check_ecl_cmd(cmd='ecl'):
    """
    Check that executionable is on the system path
    """

    try:
        subprocess.run(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
            stdin=subprocess.PIPE, shell=False)
    except FileNotFoundError:
        if cmd == 'ecl':
            warn("ecl is not on the system path! You may continue but "
                 "functionality will be limited to get and save_thor_file and "
                 "deleting workunits. All other tasks will likely fail.")
        elif cmd == 'eclcc':
            warn("eclcc is not on the system path! You may continue but "
                 "you will be unable to syntax check")
        else:
            warn("Command line check for %s failed" % cmd)


[docs]class Connection: def __init__(self, username, server="localhost", port=8010, repo=None, password="password", legacy=False, test_conn=True): """ Connection to a HPCC instance. Takes the connection details of a HPCC instance and allows ECL scripts to be executed using pythonic commands. Note that this requires both ecl.exe and eclcc.exe to be in the path. Parameters ---------- username : str The username to provide to HPCC. server : str, optional The ip address of the HPCC instance in the form `XX.XX.XX.XX`. Neither `http://` nor the port number should be included. 'localhost' by default. port : int, optional The port number ECL Watch is running on on the HPCC instance. 8010 by default. repo : str, list, None, optional A path to a location or locations to search for ecl imports. A single path can be a str, multiple paths should be in an iterable. None by default. password : str Password to provide to HPCC alongside the username. "password" by default. Note: this cannot be blank. legacy : bool, optional Should legacy flag should be enabled when executing ECL commands. False by default. test_conn : bool, optional Test connection to the server on initialisation. True by default. Attributes ---------- username: str The username to provide to HPCC. server: str The ip address of the HPCC instance in the form 'XX.XX.XX.XX' or 'localhost'. Note that neither `http://` nor the port number is included. port: int The port number ECL Watch is running on on the HPCC instance. repo: str, list-like or None, optional A path to a location or locations to search for ecl imports. A single path can be a str, multiple paths should be in an iterable. None by default. password: str Password to provide to HPCC alongside the username. legacy: bool If the legacy flag is enabled when executing ECL commands. """ if not isinstance(username, str) or not username: raise AttributeError("username must be a string, not {}".format( username)) if not isinstance(password, str) or not password.strip(): raise AttributeError("password must be a string, not {}".format( password)) self.server = server self.username = username self.port = port self.repo = repo self.password = password self.legacy = legacy if test_conn: self.test_connection() check_ecl_cmd('ecl') check_ecl_cmd('eclcc')
[docs] def test_connection(self): """ Assert that the `Connection` can connect to the HPCC instance. This method attempts to connect to ECL Watch using its `server` and `port` attributes. The credentials provided are its `username` and `password`. Returns ------- True Raises ------ Exception: If the connection fails, the relevant exception is raised. """ r = requests.get("http://{}:{}".format(self.server, self.port), auth=(self.username, self.password), timeout=5) r.raise_for_status() return True
@staticmethod def _run_command(cmd): """ Run a command line process. """ result = subprocess.run(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE) stderr = result.stderr.decode('utf-8') if result.returncode: msg = 'Failed Workunit or command line call! Response: {}'.format( result) raise subprocess.SubprocessError(msg) stdout = result.stdout.decode("utf-8") Result = collections.namedtuple("Result", ["stdout", "stderr"]) result_tuple = Result(stdout, stderr) return result_tuple
[docs] def check_syntax(self, script): """ Run an ECL syntax check on an ECL script. Uses eclcc to run a syntax check on `script`. If the syntax check fails, ie. an error is present, a SyntaxError will be raised. Note that this requires that `eclcc.exe` is on the path. Attributes `legacy` and `repo` are also used. Parameters ---------- script: str path to ECL script. Returns ------- None Raises ------ SyntaxError If the script fails the syntax check. """ b = ["eclcc", "-syntax"] b += self._legacy_arg b += self._repo_arg b += [script] try: self._run_command(b) except subprocess.SubprocessError as e: raise SyntaxError(e)
@property def _repo_arg(self): r = [self.repo] if isinstance(self.repo, str) else self.repo try: return ["-I={}".format(repo) for repo in r] except TypeError: return [] @property def _legacy_arg(self): if self.legacy: return ["-legacy"] else: return []
[docs] def run_ecl_script(self, script, syntax_check, delete_workunit, stored): """ Run an ECL script and return the stdout and stderr. Run the ECL script `script` on the HPCC instance at `server`:`port`, using the credentials `username` and `password`. If `syntax_check`, run a syntax check before execution. Attributes `legacy` and `repo` are also used. Parameters ---------- script: str path to ECL script. syntax_check: bool If a syntax check should be ran before the script is executed. delete_workunit: bool Delete workunit once completed. stored : dict or None Key value pairs to replace stored variables within the script. Values should be str, int or bool. Returns ------- result: namedtuple NamedTuple in the form (stdout, stderr). Raises ------ subprocess.CalledProcessError: If script fails syntax check. See Also -------- syntax_check run_ecl_string """ base_cmd = ['ecl', 'run', '-v', '--server={}'.format(self.server), '--port={}'.format(self.port), '--username={}'.format(self.username), '--password={}'.format(self.password)] base_cmd += self._legacy_arg base_cmd += ['thor', script] base_cmd += self._repo_arg stored = stored or {} for key, value in stored.items(): string = '-X{}={}'.format(key, value) base_cmd.append(string) if syntax_check: self.check_syntax(script) try: result = self._run_command(base_cmd) except subprocess.SubprocessError as e: msg = "Failed to run ecl command" raise subprocess.SubprocessError(msg) from e if delete_workunit: wuid = parse_wuid_from_xml(result.stdout) delete.delete_workunit(self, wuid) return result
[docs] def run_url_request(self, url, max_attempts, max_sleep): """ Return the contents of a url. Use attributes `username` and `password` to return the contents of `url`. Parameter `max_attempts` can be used to retry if an exception is raised. Each attempt is delayed by up to `max_sleep` seconds, so a large number of retries may be slow. Parameters ---------- url: str URL to query. max_attempts: int Maximum number of times url should be queried in the case of an exception being raised. max_sleep: int Maximum time, in seconds, to sleep between attempts. The true sleep time is a random int between `max_sleep` and `max_sleep` * 0.75. Returns ------- r: requests.models.Response Response object from `url` Raises ------ requests.exceptions.RetryError: If max_attempts is exceeded. """ min_sleep = int(ceil(max_sleep*0.75)) attempts = 0 while attempts < max_attempts: try: r = requests.get(url, auth=(self.username, self.password)) r.raise_for_status() return r except (HTTPError, ValueError) as e: attempts += 1 time_to_sleep = random.randint(min_sleep, max_sleep) sleep(time_to_sleep) if attempts == max_attempts: raise RetryError(e)
[docs] def get_chunk_from_hpcc(self, logical_file, start_row, n_rows, max_attempts, max_sleep): """ Using the HPCC instance at `server`:`port` and the credentials `username` and `password`, return the JSON response to a request for a part of a `logical_file`. Starting at `start row` and `n_rows` long. Parameters ---------- logical_file: str Name of logical file. start_row: int First row to return where 0 is the first row of the dataset. n_rows: int Number of rows to return. max_attempts: int Maximum number of times url should be queried in the case of an exception being raised. max_sleep: int Maximum time, in seconds, to sleep between attempts. The true sleep time is a random int between `max_sleep` and `max_sleep` * 0.75. Returns ------- resp: json JSON formatted response containing rows and all associated metadata. """ url = ("http://{}:{}/WsWorkunits/WUResult.json?LogicalName={}" "&Cluster=thor&Start={}&Count={}").format( self.server, self.port, parse.quote_plus(logical_file), start_row, n_rows) resp = self.run_url_request(url, max_attempts, max_sleep) try: resp = resp.json() except (JSONDecodeError, simpleJSONDecodeError) as exc: msg = ("response can't be parsed as JSON:\n{}".format(resp)) raise type(exc)(msg, exc.doc, exc.pos) from exc return resp
[docs] def get_logical_file_chunk(self, logical_file, start_row, n_rows, max_attempts, max_sleep): """ Return a chunk of a logical file from an HPCC instance. Using the HPCC instance at `server`:`port` and the credentials `username` and `password`, return a chunk of `logical_file` which starts at row `start_row` and is `n_rows` long. Parameters ---------- logical_file: str Name of logical file. start_row: int First row to return where 0 is the first row of the dataset. n_rows: int Number of rows to return. max_attempts: int Maximum number of times url should be queried in the case of an exception being raised. max_sleep: int Maximum time, in seconds, to sleep between attempts. The true sleep time is a random int between `max_sleep` and `max_sleep` * 0.75. Returns ------- result_response: pd.DataFrame, str Rows of logical file as list of dicts. In the form [{"col1": 1, "col2": 2}, {"col1": 1, "col2": 2}, ...]. """ # TODO: This should be an internal function. resp = self.get_chunk_from_hpcc(logical_file, start_row, n_rows, max_attempts, max_sleep) try: resp = resp["WUResultResponse"]["Result"]["Row"] except (KeyError, TypeError) as exc: msg = ("json can't be parsed as a WU:\n{}".format(resp)) raise type(exc)(msg) from exc return {key: [a_dict[key] for a_dict in resp] for key in resp[0]}
[docs] def run_ecl_string(self, string, syntax_check, delete_workunit, stored): """ Run an ECL string and return the stdout and stderr. Run the ECL string `string` on the HPCC instance at `server`:`port`, using the credentials `username` and `password`. If `syntax_check`, run a syntax check before execution. Attributes `legacy` and `repo` are also used. Parameters ---------- string: str ECL script as a string. syntax_check: bool If a syntax check should be ran before the script is executed. delete_workunit: bool Delete workunit once completed. stored : dict or None Key value pairs to replace stored variables within the script. Values should be str, int or bool. Returns ------- result: namedtuple NamedTuple in the form (stdout, stderr). Raises ------ SyntaxError: If script fails syntax check. See Also -------- syntax_check run_ecl_script """ with TemporaryDirectory() as d: p = os.path.join(d, "ecl_string.ecl") with open(p, "w+") as file: file.write(string) r = self.run_ecl_script(p, syntax_check, delete_workunit, stored) return r