Source code for hpycc.spray

"""
The module contains functions to send files to HPCC.

Functions
---------
- `spray_file` -- Spray a given csv or pandas DataFrame to HPCC.

"""
from concurrent.futures import ThreadPoolExecutor, wait
import pandas as pd
from hpycc.delete import delete_logical_file
from hpycc.utils.filechunker import make_chunks


def _spray_stringified_data(connection, data, record_set, logical_file,
                            overwrite, delete_workunit):
    """
    Spray stringified data to a HPCC logical file. To generate the
    stringified data and recordset, see `stringify_rows()` &
    `make_record_set()`

    Parameters
    ----------
    connection: `Connection`
        HPCC Connection instance, see also `Connection`.
    data: str
        Stringified data generated by `stringify_rows()`.
    record_set: str
        Recordset generated by `make_record_set()`.
    logical_file: str
        Logical file name on THOR.
    overwrite: bool
        Should the file overwrite any pre-existing logical file.
    delete_workunit: bool
        Delete the workunit once completed

    Returns
    -------
    None
    """

    script_content = ("a := DATASET([{}], {{{}}});\nOUTPUT(a, ,'{}' , "
                      "EXPIRE(1)").format(
        data, record_set, logical_file)

    if overwrite:
        script_content += ", OVERWRITE"
    script_content += ");"

    connection.run_ecl_string(script_content, True, delete_workunit,
                              stored=None)


def _get_type(typ):
    """
    Return the HPCC data type equivalent of a pandas/ numpy dtype.

    Parameters
    ----------
    typ: dtype
        Numpy or pandas dtype.

    Returns
    -------
    type: string
        ECL data type.
    """
    typ = str(typ)
    #  TODO: Can we make this work again? Non-nullable data types are a pain!
    if 'float' in typ:
        # return 'DECIMAL32_12'
        pass
    elif 'int' in typ:
        # return 'INTEGER'
        pass
    elif 'bool' in typ:
        # return 'BOOLEAN'
        pass
    else:
        # return 'STRING'
        pass

    return 'STRING'


def _stringify_rows(df, start_row, num_rows):
    """
    Return rows of a DataFrame as a HPCC ready string. Note: this ignores the
    index

    Parameters
    ----------
    df: pd.DataFrame
        DataFrame to stringify.
    start_row: int
        Start index number.
    num_rows: int
        Number of rows to include.

    Returns
    -------
    str
        ECL ready string of the slice.
    """
    sliced_df = df.loc[start_row:(start_row + num_rows - 1), df.columns]

    for col in sliced_df.columns:
        dtype = sliced_df.dtypes[col]
        ecl_type = _get_type(dtype)
        if ecl_type == "STRING":
            sliced_df[col] = sliced_df[col].fillna(
                "").astype(str).str.replace("'", "\\'")
            sliced_df[col] = "'" + sliced_df[col] + "'"
        else:
            sliced_df[col] = sliced_df[col].fillna(0)

    return ','.join(
        ["{" + ','.join(i) + "}" for i in
         sliced_df.astype(str).values.tolist()]
    ).encode('ascii', 'ignore').decode()


[docs]def spray_file(connection, source_file, logical_file, overwrite=False, expire=None, chunk_size=100000, max_workers=5, delete_workunit=True): """ Spray a file to a HPCC logical file, bypassing the landing zone. Parameters ---------- connection: `Connection` HPCC Connection instance, see also `Connection`. source_file: str, pd.DataFrame A pandas DataFrame or the path to a csv. logical_file: str Logical file name on THOR. overwrite: bool, optional Should the file overwrite any pre-existing logical file. False by default. chunk_size: int, optional Size of chunks to use when spraying file. 100000 by default. max_workers: int, optional Number of concurrent threads to use when spraying. Warning: too many will likely cause either your machine or your cluster to crash! 3 by default. expire: int How long (days) until the produced logical file expires? None (ie no expiry) by default delete_workunit: bool Delete workunit once completed. Returns ------- None """ if isinstance(source_file, pd.DataFrame): df = source_file elif isinstance(source_file, str): df = pd.read_csv(source_file, encoding='latin') else: raise TypeError if logical_file[0] != '~': SyntaxWarning("""Your Logical file name (%s) did not start with ~ so may not be sprayed to root""" % logical_file) record_set = _make_record_set(df) chunks = make_chunks(len(df), chunk_size=chunk_size) print('Any unicode characters will be converted to ASCII, not saying you ' 'have any, just warning you! If you are getting odd errors you may ' 'want to deal with your UTF before spraying.') stringified_rows = (_stringify_rows(df, start_row, num_rows) for start_row, num_rows in chunks) target_names = ["~TEMPHPYCC::{}from{}to{}".format( logical_file.replace("~", ""), start_row, start_row + num_rows) for start_row, num_rows in chunks] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [ executor.submit(_spray_stringified_data, connection, row, record_set, name, overwrite, delete_workunit) for row, name in zip(stringified_rows, target_names)] _, __ = wait(futures) _ = [f.result() for f in futures] _concatenate_logical_files(connection, target_names, logical_file, record_set, overwrite, expire, delete_workunit) for tmp in target_names: delete_logical_file(connection, tmp, delete_workunit)
def _make_record_set(df): """ Make an ECL recordset from a DataFrame. Parameters ---------- df: pd.DataFrame DataFrame to make recordset from. Returns ------- record_set: string String recordset. """ record_set = ";".join([" ".join((_get_type(dtype), col)) for col, dtype in df.dtypes.to_dict().items()]) return record_set def _concatenate_logical_files(connection, to_concat, logical_file, record_set, overwrite, expire, delete_workunit): """ Concatenate a list of logical files (with the same recordset) into a single logical file. Parameters ---------- connection: `Connection` HPCC Connection instance, see also `Connection`. to_concat: list, iterable. Iterable of pre-existing logical file names to concatenate. logical_file: str Logical file name to concatenate into. record_set: str Common recordset of all logical files, see `make_record_set()`. overwrite: bool Should the file overwrite any pre-existing logical file. delete_workunit: bool Delete workunit once completed. expire: int How long (days) until the produced logical file expires? Returns ------- None """ read_files = ["DATASET('{}', {{{}}}, THOR)".format( nam, record_set) for nam in to_concat] read_files = '+\n'.join(read_files) script = "a := {};\nOUTPUT(a, ,'{}' " if overwrite: script += ", OVERWRITE" if expire: script += ", EXPIRE({})".format(expire) script += ");" script = script.format(read_files, logical_file) connection.run_ecl_string(script, True, delete_workunit=delete_workunit, stored=None) if __name__ == '__main__': from hpycc.connection import Connection import pandas as pd from hpycc import spray_file con = Connection('hpcc', password="password", server="localhost", port=8010, repo='C:\z\dapper') file_name = "https://gist.githubusercontent.com/jncraton" \ "/68beb88e6027d9321373/raw" \ "/381dcf8c0d4534d420d2488b9c60b1204c9f4363/starwars.csv" df = pd.read_csv(file_name) df.head() spray_file(con, df, '~ROB::TEMP::SWCOLLECTABLES.CSV', expire=1, delete_workunit=False)