Source code for hpycc.get

"""
Functions to get data out of a HPCC instance.

This module contains functions to get either the output(s) of an
ECL script, or the contents of a logical file. The first input to all
functions is an instance of `Connection`.

Functions
---------
- `get_output` -- Return the first output of an ECL script.
- `get_outputs` -- Return all outputs of an ECL script.
- `get_thor_file` -- Return the contents of a thor file.

"""
__all__ = ["get_output", "get_outputs", "get_thor_file"]

from concurrent.futures import ThreadPoolExecutor, as_completed
import re
import warnings
import pandas as pd
from hpycc.utils import filechunker
from hpycc.utils.parsers import parse_xml, parse_schema_from_xml, apply_custom_dtypes
from math import ceil


[docs]def get_output(connection, script, syntax_check=True, delete_workunit=True, stored=None): """ Return the first output of an ECL script as a pandas.DataFrame. Note that whilst attempts are made to preserve the datatypes of the result, anything with an ambiguous type will revert to a string. If the output of the ECL string is an empty dataset (or if the script does not output anything), an empty pandas.DataFrame is returned. Parameters ---------- connection: hpycc.Connection HPCC Connection instance, see also `Connection`. script: str Path of script to execute. syntax_check: bool, optional Should the script be syntax checked before execution? True by default. delete_workunit: bool, optional Delete workunit once completed. True by default. stored : dict or None, optional Key value pairs to replace stored variables within the script. Values should be str, int or bool. None by default. Returns ------- pandas.DataFrame of the first output of `script`. Raises ------ SyntaxError: If script fails syntax check. See Also -------- get_outputs save_output Connection.syntax_check Examples -------- >>> import hpycc >>> conn = hpycc.Connection("user") >>> with open("example.ecl", "r+") as file: ... file.write("OUTPUT(2);") >>> hpycc.get_output(conn, "example.ecl") Result_1 0 2 >>> import hpycc >>> conn = hpycc.Connection("user") >>> with open("example.ecl", "r+") as file: >>> file.write("OUTPUT(2);OUTPUT(3);") >>> hpycc.get_output(conn, "example.ecl") Result_1 0 2 >>> import hpycc >>> conn = hpycc.Connection("user") >>> with open("example.ecl", "r+") as file: ... file.write( ... "a:= DATASET([{'1', 'a'}]," ... "{STRING col1; STRING col2});", ... "OUTPUT(a);") >>> hpycc.get_output(conn, "example.ecl") col1 col2 0 1 a >>> import hpycc >>> conn = hpycc.Connection("user") >>> with open("example.ecl", "r+") as file: ... file.write( ... "a:= DATASET([{'a', 'a'}]," ... "{STRING col1;});", ... "OUTPUT(a(col1 != a));") >>> hpycc.get_output(conn, "example.ecl") Empty DataFrame Columns: [] Index: [] """ result = connection.run_ecl_script(script, syntax_check, delete_workunit, stored) result = result.stdout.replace("\r\n", "") regex = "<Dataset name='(?P<name>.+?)'>(?P<content>.+?)</Dataset>" match = re.search(regex, result) warn_msg = "The output does not appear to contain a dataset. Returning an empty DataFrame." try: match_content = match.group() parsed = parse_xml(match_content) except AttributeError: parsed = pd.DataFrame() if len(parsed) == 0: warnings.warn(warn_msg) return parsed
[docs]def get_outputs(connection, script, syntax_check=True, delete_workunit=True, stored=None): """ Return all outputs of an ECL script. Note that whilst attempts are made to preserve the datatypes of the result, anything with an ambiguous type will revert to a string. Parameters ---------- connection: hpycc.Connection HPCC Connection instance, see also `Connection`. script: str Path of script to execute. syntax_check: bool, optional Should the script be syntax checked before execution? True by default. delete_workunit: bool, Delete the workunit once completed. True by default. stored : dict or None, optional Key value pairs to replace stored variables within the script. Values should be str, int or bool. None by default. Returns ------- as_dict: dict of pandas.DataFrames Outputs of `script` in the form {output_name: pandas.DataFrame} Raises ------ SyntaxError: If script fails syntax check. See Also -------- get_output save_outputs Connection.syntax_check Examples -------- >>> import hpycc >>> conn = hpycc.Connection("user") >>> with open("example.ecl", "r+") as file: ... file.write("OUTPUT(2);") >>> hpycc.get_outputs(conn, "example.ecl") {Result_1: Result_1 0 2 } >>> import hpycc >>> conn = hpycc.Connection("user") >>> with open("example.ecl", "r+") as file: ... file.write( ... "a:= DATASET([{'1', 'a'}]," ... "{STRING col1; STRING col2});", ... "OUTPUT(a);") >>> hpycc.get_outputs(conn, "example.ecl") {Result_1: col1 col2 0 1 a } >>> import hpycc >>> conn = hpycc.Connection("user") >>> with open("example.ecl", "r+") as file: ... file.write( ... "a:= DATASET([{'1', 'a'}]," ... "{STRING col1; STRING col2});", ... "OUTPUT(a);" ... "OUTPUT(a);") >>> hpycc.get_outputs(conn, "example.ecl") {Result_1: col1 col2 0 1 a, Result_2: col1 col2 0 1 a } >>> import hpycc >>> conn = hpycc.Connection("user") >>> with open("example.ecl", "r+") as file: ... file.write( ... "a:= DATASET([{'1', 'a'}]," ... "{STRING col1; STRING col2});", ... "OUTPUT(a);" ... "OUTPUT(a, NAMED('ds_2'));") >>> hpycc.get_outputs(conn, "example.ecl") {Result_1: col1 col2 0 1 a, ds_2: col1 col2 0 1 a } """ result = connection.run_ecl_script(script, syntax_check, delete_workunit, stored) regex = "<Dataset name='(?P<name>.+?)'>(?P<content>.*?)</Dataset>" result = result.stdout.replace("\r\n", "") results = re.findall(regex, result) if any([i[1] == "" for i in results]): warnings.warn( "One or more of the outputs do not appear to contain a dataset. " "They have been replaced with an empty DataFrame") as_dict = {name.replace(" ", "_"): parse_xml(xml) for name, xml in results} return as_dict
[docs]def get_thor_file(connection, thor_file, max_workers=10, chunk_size='auto', max_attempts=3, max_sleep=60, dtype=None): """ Return a thor file as a pandas.DataFrame. Note: Ordering of the resulting DataFrame is not deterministic and may not be the same as on the HPCC cluster. Parameters ---------- connection: hpycc.Connection HPCC Connection instance, see also `Connection`. thor_file: str Name of thor file to be downloaded. max_workers: int, optional Number of concurrent threads to use when downloading file. Warning: too many may cause instability! 10 by default. chunk_size: int, optional Size of chunks to use when downloading file. If auto this is rows / workers (bounded between 100,000 and 400,000). If give then no limits are enforced. max_attempts: int, optional Maximum number of times a chunk should attempt to be downloaded in the case of an exception being raised. 3 by default. max_sleep: int, optional Minimum time, in seconds, to sleep between attempts. The true sleep time is a random int between `max_sleep` and `max_sleep` * 0.75. dtype: type name or dict of col -> type, optional Data type for data or columns. E.g. {‘a’: np.float64, ‘b’: np.int32}. If converters are specified, they will be applied INSTEAD of dtype conversion. If None, or columns are missing from the provided dict, they will be converted to one of bool, str or int based on the HPCC datatype. None by default. Returns ------- df: pandas.DataFrame Thor file as a pandas.DataFrame. See Also -------- save_thor_file Examples -------- >>> import hpycc >>> import pandas >>> conn = hpycc.Connection("user") >>> df = pandas.DataFrame({"col1": [1, 2, 3]}) >>> df.to_csv("example.csv", index=False) >>> hpycc.spray_file(conn,"example.csv","example") >>> hpycc.get_thor_file(conn, "example") col1 0 1 1 2 2 3 >>> import hpycc >>> import pandas >>> conn = hpycc.Connection("user") >>> df = pandas.DataFrame({"col1": [1, 2, 3]}) >>> df.to_csv("example.csv", index=False) >>> hpycc.spray_file(conn,"example.csv","example") >>> hpycc.get_thor_file(conn, "example", dtype=str) col1 0 '1' 1 '2' 2 '3' """ resp = connection.get_chunk_from_hpcc(thor_file, 0, 1, max_attempts, max_sleep) try: wuresultresponse = resp["WUResultResponse"] schema_str = wuresultresponse["Result"]["XmlSchema"]["xml"] schema = parse_schema_from_xml(schema_str) schema = apply_custom_dtypes(schema, dtype) num_rows = wuresultresponse["Total"] except (KeyError, TypeError) as exc: msg = "Can't find schema in returned json: {}".format(resp) raise type(exc)(msg) from exc if chunk_size == 'auto': # Automagically optimise. TODO: we could use width too. suggested_size = ceil(num_rows/max_workers) chunk_size = num_rows if suggested_size < 10000 else suggested_size # Don't chunk small stuff. chunk_size = 325000 if suggested_size > 325000 else chunk_size # More chunks than workers for big stuff. if not num_rows or num_rows == 0: # if there are no rows to go and get, we should return an empty dataframe return pd.DataFrame(columns=schema.keys()) chunks = filechunker.make_chunks(num_rows, chunk_size) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [ executor.submit(connection.get_logical_file_chunk, thor_file, start_row, n_rows, max_attempts, max_sleep) for start_row, n_rows in chunks ] results = {key: [] for key in schema.keys()} for result in as_completed(futures): result = result.result() [results[k].extend(result[k]) for k in results.keys()] del result results = pd.DataFrame(results) for col in schema.keys(): c = schema[col] nam = col typ = c['type'] if c['is_a_set']: # TODO: Nested DF are also caught here. Open issue to fix results[nam] = results[nam].map(lambda x: [typ(i) for i in x["Item"]]) else: try: results[nam] = results[nam].astype(typ) except OverflowError: # An int that is horrifically long cannot be converted properly. Use float instead results[nam] = results[nam].astype('float') return results