"""
The module contains functions to send files to HPCC.
Functions
---------
- `spray_file` -- Spray a given csv or pandas DataFrame to HPCC.
"""
from concurrent.futures import ThreadPoolExecutor, wait
import pandas as pd
from hpycc.delete import delete_logical_file
from hpycc.utils.filechunker import make_chunks
def _spray_stringified_data(connection, data, record_set, logical_file,
overwrite, delete_workunit):
"""
Spray stringified data to a HPCC logical file. To generate the
stringified data and recordset, see `stringify_rows()` &
`make_record_set()`
Parameters
----------
connection: `Connection`
HPCC Connection instance, see also `Connection`.
data: str
Stringified data generated by `stringify_rows()`.
record_set: str
Recordset generated by `make_record_set()`.
logical_file: str
Logical file name on THOR.
overwrite: bool
Should the file overwrite any pre-existing logical file.
delete_workunit: bool
Delete the workunit once completed
Returns
-------
None
"""
script_content = ("a := DATASET([{}], {{{}}});\nOUTPUT(a, ,'{}' , "
"EXPIRE(1)").format(
data, record_set, logical_file)
if overwrite:
script_content += ", OVERWRITE"
script_content += ");"
connection.run_ecl_string(script_content, True, delete_workunit,
stored=None)
def _get_type(typ):
"""
Return the HPCC data type equivalent of a pandas/ numpy dtype.
Parameters
----------
typ: dtype
Numpy or pandas dtype.
Returns
-------
type: string
ECL data type.
"""
typ = str(typ)
# TODO: Can we make this work again? Non-nullable data types are a pain!
if 'float' in typ:
# return 'DECIMAL32_12'
pass
elif 'int' in typ:
# return 'INTEGER'
pass
elif 'bool' in typ:
# return 'BOOLEAN'
pass
else:
# return 'STRING'
pass
return 'STRING'
def _stringify_rows(df, start_row, num_rows):
"""
Return rows of a DataFrame as a HPCC ready string. Note: this ignores the
index
Parameters
----------
df: pd.DataFrame
DataFrame to stringify.
start_row: int
Start index number.
num_rows: int
Number of rows to include.
Returns
-------
str
ECL ready string of the slice.
"""
sliced_df = df.loc[start_row:(start_row + num_rows - 1), df.columns]
for col in sliced_df.columns:
dtype = sliced_df.dtypes[col]
ecl_type = _get_type(dtype)
if ecl_type == "STRING":
sliced_df[col] = sliced_df[col].fillna(
"").astype(str).str.replace("'", "\\'")
sliced_df[col] = "'" + sliced_df[col] + "'"
else:
sliced_df[col] = sliced_df[col].fillna(0)
return ','.join(
["{" + ','.join(i) + "}" for i in
sliced_df.astype(str).values.tolist()]
).encode('ascii', 'ignore').decode()
[docs]def spray_file(connection, source_file, logical_file, overwrite=False,
expire=None, chunk_size=100000, max_workers=5,
delete_workunit=True):
"""
Spray a file to a HPCC logical file, bypassing the landing zone.
Parameters
----------
connection: `Connection`
HPCC Connection instance, see also `Connection`.
source_file: str, pd.DataFrame
A pandas DataFrame or the path to a csv.
logical_file: str
Logical file name on THOR.
overwrite: bool, optional
Should the file overwrite any pre-existing logical file.
False by default.
chunk_size: int, optional
Size of chunks to use when spraying file. 100000 by
default.
max_workers: int, optional
Number of concurrent threads to use when spraying.
Warning: too many will likely cause either your machine or
your cluster to crash! 3 by default.
expire: int
How long (days) until the produced logical file expires? None
(ie no expiry) by default
delete_workunit: bool
Delete workunit once completed.
Returns
-------
None
"""
if isinstance(source_file, pd.DataFrame):
df = source_file
elif isinstance(source_file, str):
df = pd.read_csv(source_file, encoding='latin')
else:
raise TypeError
if logical_file[0] != '~':
SyntaxWarning("""Your Logical file name (%s) did not start with
~ so may not be sprayed to root""" % logical_file)
record_set = _make_record_set(df)
chunks = make_chunks(len(df), chunk_size=chunk_size)
print('Any unicode characters will be converted to ASCII, not saying you '
'have any, just warning you! If you are getting odd errors you may '
'want to deal with your UTF before spraying.')
stringified_rows = (_stringify_rows(df, start_row, num_rows)
for start_row, num_rows in chunks)
target_names = ["~TEMPHPYCC::{}from{}to{}".format(
logical_file.replace("~", ""), start_row, start_row + num_rows)
for start_row, num_rows in chunks]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(_spray_stringified_data, connection, row,
record_set, name, overwrite, delete_workunit)
for row, name in zip(stringified_rows, target_names)]
_, __ = wait(futures)
_ = [f.result() for f in futures]
_concatenate_logical_files(connection, target_names, logical_file, record_set, overwrite, expire, delete_workunit)
for tmp in target_names:
delete_logical_file(connection, tmp, delete_workunit)
def _make_record_set(df):
"""
Make an ECL recordset from a DataFrame.
Parameters
----------
df: pd.DataFrame
DataFrame to make recordset from.
Returns
-------
record_set: string
String recordset.
"""
record_set = ";".join([" ".join((_get_type(dtype), col))
for col, dtype
in df.dtypes.to_dict().items()])
return record_set
def _concatenate_logical_files(connection, to_concat, logical_file, record_set,
overwrite, expire, delete_workunit):
"""
Concatenate a list of logical files (with the same recordset)
into a single logical file.
Parameters
----------
connection: `Connection`
HPCC Connection instance, see also `Connection`.
to_concat: list, iterable.
Iterable of pre-existing logical file names to concatenate.
logical_file: str
Logical file name to concatenate into.
record_set: str
Common recordset of all logical files, see `make_record_set()`.
overwrite: bool
Should the file overwrite any pre-existing logical file.
delete_workunit: bool
Delete workunit once completed.
expire: int
How long (days) until the produced logical file expires?
Returns
-------
None
"""
read_files = ["DATASET('{}', {{{}}}, THOR)".format(
nam, record_set) for nam in to_concat]
read_files = '+\n'.join(read_files)
script = "a := {};\nOUTPUT(a, ,'{}' "
if overwrite:
script += ", OVERWRITE"
if expire:
script += ", EXPIRE({})".format(expire)
script += ");"
script = script.format(read_files, logical_file)
connection.run_ecl_string(script, True, delete_workunit=delete_workunit, stored=None)
if __name__ == '__main__':
from hpycc.connection import Connection
import pandas as pd
from hpycc import spray_file
con = Connection('hpcc', password="password", server="localhost", port=8010,
repo='C:\z\dapper')
file_name = "https://gist.githubusercontent.com/jncraton" \
"/68beb88e6027d9321373/raw" \
"/381dcf8c0d4534d420d2488b9c60b1204c9f4363/starwars.csv"
df = pd.read_csv(file_name)
df.head()
spray_file(con, df, '~ROB::TEMP::SWCOLLECTABLES.CSV', expire=1,
delete_workunit=False)