""" Collection of generic python helper programs """ import numpy as np import os import h5py as h5 from pdb import set_trace import ctypes as c import time def dir(filename): """Return the (top-level) directory of a given path.""" return '/'.join((filename.split('/'))[:-1]) + '/' def file_to_dir(filename): """ Convenience alias of dir(), for backwards-compatibility.""" return dir(filename) def write_hdf5(arr, outloc, name, new=False, comment = None, update = True, replace = True, compression = None): """ Write a numpy array to an HDF5 dataset. Args: arr (array): The array to write to an HDF5 file. outloc (string): The file name to write to. name (string): The name of the data set to write, including (possibly) its containing group(s). new (bool, optional): Rename a possibly existing file with the target name to '[outloc].old'. Defaults to False. comment (string, optional): String comment to describe the content of the data set; written as an HDF5 attribute. If None (default), no comment is written. update (bool, optional): first check whether the data set already exists, and update it if so (default). If False, a new data set is created, which results in an error if it already exists. replace (bool, optional): if a data set is updated, the old one is first deleted (default). If False, in-place update is done, which will give an error if the old and new data sets are of different shape. compression (string, optional): compression to be applied to new data sets ('gzip' or 'lzf', default None). Returns: None """ # Move pre-existing file out of the way if desired: if new: if os.path.isfile(outloc): os.rename(outloc, outloc+'.old') f = h5.File(outloc, 'a') # If update is enabled, check whether data set already exists: if update and name in f: print("Dataset '" + name + "' already exists, updating...") if replace: del f[name] else: dset = f[name] if not name in f: dset = f.create_dataset(name, arr.shape, dtype = arr.dtype, compression = compression) # Actually write array to the data set, which now definitely exists. dset[...] = arr # Write comment if desired (update it if it already exists): if comment is not None: if 'Comment' in dset.attrs.keys(): dset.attrs.modify('Comment', np.string_(comment)) else: dset.attrs.create('Comment', np.string_(comment)) f.close() return def write_hdf5_attribute(filename, container, att_name, value, new = False, group = True, update = True): """ Write a variable to an HDF5 attribute. Args: filename (string): The name of the file to write to. container (string): The data set or group to attach the attribute to. att_name (string): The name of the attribute. value (scalar, array, or np.string_([string])): Variable to write. new (bool, optional): Move a possibly pre-existing file to [filename].old and start a new file containing only this attribute. Defaults to False. group (bool, optional): If container does not exist, create a new group (True) or data set (False). Defaults to True. update (bool, optional): first check whether the attribute already exists, and update it if so (default). If False, a new attribute is created, which results in an error if it already exists. update (bool, optional): If the Returns: None """ if value is None: value = np.nan if isinstance(value, str): value = np.string_(value) # Move pre-existing file out of the way if desired: if new: if os.path.isfile(filename): os.rename(filename, filename+'.old') f = h5.File(filename, 'a') # Create appropriate container if it does not yet exist: if container not in f.keys(): if group: f.create_group(container) else: f.create_dataset(container) # Modify existing attribute, or create new if it does not yet exist: if att_name in f[container].attrs.keys() and update: f[container].attrs.modify(att_name, value) else: f[container].attrs.create(att_name, value) return def read_hdf5_attribute(filename, container, att_name): """Read an HDF5 attribute from a specified file and container.""" try: f = h5.File(filename, 'r') att = f[container].attrs[att_name] f.close() except: raise Exception("Did not find specified attribute '" + att_name + "' in container '" + container + "'...") return att def hdf5_attrs_to_dict(filename, container): """Read all attributes from a specified file and container into a dict.""" try: f = h5.File(filename, 'r') cont = f[container] attrs_tuple = list(cont.attrs.items()) except: raise Exception("Could not retrieve attributes from container {:d}" .format(container)) att_dict = {} for att in attrs_tuple: att_dict[att[0]] = att[1] return att_dict def check_hdf5_dataset(filename, dataset): """Check whether a specified dataset exists in an HDF5 file""" f = h5.File(filename, 'r') try: d = f[dataset] except: return False return isinstance(d, h5.Dataset) def check_hdf5_group(filename, group): """Check whether a specified HDF5 group exists""" f = h5.File(filename, 'r') try: d = f[group] except: return False return isinstance(d, h5.Group) def read_hdf5(file, var, first=None, stop=None): """ Read one dataset from an HDF5 file. Optionally, only a section of the data set may be read. Args: file (string): HDF5 file to read from. var (string): Dataset to read, including containing groups (if so). first (int, optional): First element to read, defaults to zero. stop (int, optional): Beyond-last element to read, defaults to N. Returns: array (type and dimensions depends on data set). """ f = h5.File(file, 'r') dataset = f[var] dtype = f[var].dtype dshape = f[var].shape if first is None: first = 0 if stop is None: stop = dshape[0] if first == 0 and stop == dshape[0]: outshape = f[var].shape data_out = np.empty(f[var].shape, f[var].dtype) else: outshape = np.zeros(len(f[var].shape), dtype = int) outshape[0] = stop-first outshape[1:] = dshape[1:] data_out = np.empty(outshape, dtype) if f[var].size: if first == 0 and stop == dshape[0]: f[var].read_direct(data_out) else: data_out = f[var][first:stop, ...] f.close() return data_out def list_hdf5_datasets(file, group = None): """ Return a list of all datasets in a HDF5 group. Args: file (string): The filename to query. group (string, optional): The group in which to look for data sets. Returns: list of strings: Names of data sets. """ hf = h5.File(file, 'r') if group is not None: hf = hf[group] return [key for key in hf.keys()] def is_sorted(arr): """Test whether input array is sorted in ascending order.""" return all(arr[i] <= arr[i+1] for i in range(len(arr)-1)) def ckatamaran_search(a, b): """ Sped-up version of katamaran-search which does the loop in C. Note that the file path to ckat.so may need updating on systems other than VIRGO. Args: a (array): Elements to locate. Must be unique and sorted. b (array): Reference array. Must also be unique and sorted. Returns: locs_a (int array): indices of a in b (-1 if not found). """ if len(a) == 0: return np.zeros(0, dtype = np.int32)-1 locs_a = np.zeros(len(a), dtype = np.int64)-1 if len(b) == 0: return locs_a ObjectFile = "/u/ybahe/ANALYSIS/ckat.so" lib = c.cdll.LoadLibrary(ObjectFile) ckat = lib.ckat a_for_c = a.astype(np.int64) b_for_c = b.astype(np.int64) a_p = a_for_c.ctypes.data_as(c.c_void_p) b_p = b_for_c.ctypes.data_as(c.c_void_p) na_c = c.c_long(len(a)) nb_c = c.c_long(len(b)) locs_a_p = locs_a.ctypes.data_as(c.c_void_p) myargv = c.c_void_p * 5 argv = myargv(a_p, b_p, c.addressof(na_c), c.addressof(nb_c), locs_a_p) succ = ckat(5, argv) return locs_a def katamaran_search(a, b): """ Perform a katamaran search to locate elements of a in b. This assumes that the elements in a and b are unique, and that a and b are sorted. Args: a (array): Elements to locate. Must be unique and sorted. b (array): Reference array. Must also be unique and sorted. Returns: locs_a (int array): indices of a in b (-1 if not found). """ if len(a) == 0: return np.zeros(0, dtype = int)-1 locs_a = np.zeros(len(a), dtype = int)-1 if len(b) == 0: return locs_a ind_a = ind_b = 0 len_a = len(a) len_b = len(b) val_a = a[ind_a] val_b = b[ind_b] while(True): if val_a > val_b: ind_b += 1 if ind_b >= len_b: break val_b = b[ind_b] continue if val_b > val_a: ind_a += 1 if ind_a >= len_a: break val_a = a[ind_a] continue if val_a == val_b: locs_a[ind_a] = ind_b ind_a += 1 ind_b += 1 if ind_a >= len_a or ind_b >= len_b: break val_a = a[ind_a] val_b = b[ind_b] continue return locs_a def create_reverse_list(ids, delete_ids = False, cut = False, maxval = None): """ Create a reverse index list from a (unique) list of IDs. This gives the index for each ID value, and is thus the inverse of the input list (which gives the ID for an index): Input ID list: --> Reverse index list: || 4 | 0 | 5 | 1 || --> || 1 | 3 | -1 | -1 | 0 | 2 || Args: ids (int array): Input ID list to invert. Assumed to be unique. delete_ids (bool, optional): Delete input ID list after building reverse list. Defaults to False. cut (bool, optional): Ignore negative input IDs. Defaults to False. maxval (int, optional): Build reverse list with at least maxval+1 entries (i.e. that can be indexed by values up to maxval), even if this exceeds the maximum input ID. Returns: revlist (int array): The reverse index list. """ # If there is no input, return empty array if len(ids) == 0 and maxval is None: return np.zeros(0, dtype = np.int32) if len(ids) > 0: maxid = ids.max() else: maxid = -1 if maxval is not None: if maxval > maxid: maxid = maxval if len(ids) > 2e9: dtype = np.int64 else: dtype = np.int32 if cut: ind_good = np.nonzero(ids >= 0)[0] ngood = len(ind_good) else: ind_good = np.arange(ids.shape[0], dtype = int) ngood = ids.shape[0] revlist = np.zeros(np.int64(maxid+1), dtype = dtype)-1 revlist[ids[ind_good]] = ind_good if delete_ids: del ids return revlist def find_id_indices(ids, reflist, max_direct = 1e10, min_c = 1e5): """ Find and return the locations of IDs in a reference list. The function can handle arbitrarily high ID values. If the maximum value is below an (adjustable) threshold, the lookup is performed via an explicit reverse-lookup list. Otherwise, both input and reference lists are sorted and compared via a "Katamaran-search". Note that using a reverse-lookup list is typically much faster, but may use substantial amounts of memory. For small input lists, the sort-and-search approach may be faster, which can be enabled by setting max_direct = 0. Args: ids (int array): Array of IDs whose indices should be returned. reflist (int array): Reference list of IDs, assumed to be unique. The function will search for each input ID in this array. max_direct (int): Maximum ID value (in input or ref list) for which the direct (inverse lookup list) method will be used. Defaults to 1e10. Note that this will consume at least max_direct * 8 GB of memory. min_c (int): Minimum length of ID list (input or ref) for which a Katamaran-search is outsourced to the C-library. Defaults to 1e5. Irrelevant if a direct search is performed. Returns: ind (int array): The index in reflist for each input ID (-1 if it could not be located at all). ind_match (int array): The input ID indices that could be matched. """ maxid_in = np.max(ids) maxid_ref = np.max(reflist) if maxid_in > max_direct or maxid_ref > max_direct: use_direct = False else: use_direct = True if use_direct: revlist = create_reverse_list(reflist, maxval = maxid_in) ind = revlist[ids] ind_match = np.nonzero(ind >= 0)[0] else: # Need to identify matching IDs in sh_ids by brute force tstart = time.time() sorter_in = np.argsort(ids) sorter_ref = np.argsort(reflist) tsort = time.time()-tstart tstart = time.time() if len(ids) > min_c or len(reflist) >= min_c: ind_in_sorted_ref = ckatamaran_search(ids[sorter_in], reflist[sorter_ref]) else: ind_in_sorted_ref = katamaran_search(ids[sorter_in], reflist[sorter_ref]) ind_prematch_in = np.nonzero(ind_in_sorted_ref >= 0)[0] ind = np.zeros(len(ids), dtype = int)-1 ind[sorter_in[ind_prematch_in]] = sorter_ref[ind_in_sorted_ref[ind_prematch_in]] ind_match = sorter_in[ind_prematch_in] tkat = time.time()-tstart print("Sorting took {:.3f} sec." .format(tsort)) print("Kat-search took {:.3f} sec." .format(tkat)) return ind, ind_match def sum_by_index(quant, index, numBin=None): """ Computes the sum of a quantity for each supplied index. In other words, for each index value from 0 to max(index), it finds which elements have this index value, and then sums their quantity values. Args: quant : ndarray (float): A [N] element array containing the to-be-summed values. index : ndarray (int): A [N] element array containing the indices to sum over. The ith element of index corresponds to the ith element of quant. numBin : int (optional): If specified, the minimum number of elements in the output array, provided it is higher than max(index)+1. Returns: sums : ndarray (float): A [max(index)] element array containing the sum of quantities for each index value (including non-existing ones, which are set to zero). """ max_index = np.max(index) numPt = len(index) if numBin is None: numBin = max_index+1 if numBin < max_index+1: numBin = max_index+1 quant_binned = np.zeros(numBin, dtype = np.float64) quant = quant.astype(np.float64) index = index.astype(np.int64) # *********** IMPORTANT ******************************** # This next line needs to be modified to point # to the full path of where the library has been copied. # ******************************************************* ObjectFile = "/u/ybahe/ANALYSIS/PACKAGES/lib/sum_index.so" c_numPart = c.c_long(numPt) c_nbins = c.c_long(numBin) partQuant_p = quant.ctypes.data_as(c.c_void_p) partIndex_p = index.ctypes.data_as(c.c_void_p) result_p = quant_binned.ctypes.data_as(c.c_void_p) nargs = 5 myargv = c.c_void_p * nargs argv = myargv(c.addressof(c_numPart), c.addressof(c_nbins), partQuant_p, partIndex_p, result_p) lib = c.cdll.LoadLibrary(ObjectFile) succ = lib.sum_index(nargs, argv) print("Sum of particle quant: ", np.sum(quant)) print("Sum of binned quant: ", np.sum(quant_binned)) return quant_binned def linear(x0, x1, y0, y1, flag=None): """ Compute linear interpolation coefficients for a given set of points. Parameters ---------- x0 : float First x coordinate (same for all points) x1 : float Second x coordinate (same for all points) y0 : array (float) First y coordinates of all points. y1 : array (float) Second y coordinates of all points. flag : array (int8, optional) Code that represents which coordinates are valid for each point. 0 --> only first 1 --> only second 2 --> both If None (default), it is assumed that both are valid for all points. Notes ----- It is assumed that there is at least one valid coordinate for each point. """ if flag is None: flag = np.zeros(y0.shape[0], dtype = np.int8)+2 a = np.zeros_like(y0) b = np.zeros_like(y0) ind_full = np.nonzero(flag == 2)[0] if len(ind_full) > 0: a[ind_full, ...] = (y0[ind_full, ...]-y1[ind_full, ...])/(x0-x1) b[ind_full, ...] = y1[ind_full] - x1*a[ind_full, ...] ind_0 = np.nonzero(flag == 0)[0] if len(ind_0) > 0: b[ind_0, ...] = y0[ind_0, ...] ind_1 = np.nonzero(flag == 1)[0] if len(ind_1) > 0: b[ind_1, ...] = y1[ind_0, ...] return a, b def quadratic(x1, x2, x3, y1, y2, y3, flag=None): """ Flag meanings: 0 --> only first two points good (ignore 3) 1 --> only last two points good (ignore 1) 2 --> all points good (default if None) More complex cases are not yet considered. """ a = np.zeros_like(y1) b = np.zeros_like(y1) c = np.zeros_like(y1) # Complex case first: full quadratic reconstruction if flag is None: flag = np.zeros(y1.shape[0], dtype = np.int8)+2 ind_full = np.nonzero(flag == 2)[0] if len(ind_full) > 0: a[ind_full, ...] = ( (y1-y3+(x3-x1)/(x2-x3)*(y2-y3)) / (x1**2-x3**2-(x1-x3)*(x2+x3)))[ind_full, ...] b[ind_full, ...] = ((y2-y3)/(x2-x3) - a*(x2+x3))[ind_full, ...] c[ind_full, ...] = (y3-a*x3**2-b*x3)[ind_full, ...] # Linear fit (a=0) for first two points: ind_0 = np.nonzero(flag == 0)[0] if len(ind_0) > 0: b[ind_0, ...] = (y1[ind_0, ...]-y2[ind_0, ...])/(x1-x2) c[ind_0, ...] = y2[ind_0, ...] - x2*b[ind_0, ...] # Linear fit (a=0) for last two points: ind_1 = np.nonzero(flag == 1)[0] if len(ind_1) > 0: b[ind_1, ...] = (y2[ind_1, ...]-y3[ind_1, ...])/(x2-x3) c[ind_1, ...] = y3[ind_1, ...] - x3*b[ind_1, ...] return a, b, c def cubic_local(t1, t2, r1, r2, v1, v2): """ Reconstruct a cubic polynomial from the function values (r1, r2) and derivatives (v1, v2) at the two endpoints (t1, t2). Note that this does not yet include a facility to resort to a lower-order polynomial if any of these four boundary conditions are absent. """ a = np.zeros_like(r1) b = np.zeros_like(r1) c = np.zeros_like(r1) d = np.zeros_like(r1) a = ((r2-r1 - v2*t2 + (t1**2-2*t1*t2)*(v1-v2)/(2*(t1-t2)) + t1*v2 + (v1-v2)*t2**2/(2*(t1-t2))) / (3/2*((t1-t2)**2) * (t1+t2) -2*t2**3 - t1**3 + 3*t1*t2**2)) b = (v1-v2-3*a*(t1**2-t2**2))/(2*(t1-t2)) c = v2 - 3*a*t2**2 - 2*b*t2 d = (r1 - a*(t1**3-3*t1*t2**2 - 3/2*(t1**2-2*t1*t2)*(t2+t1)) - (t1**2-2*t1*t2)*(v1-v2)/(2*(t1-t2)) - t1*v2) return a, b, c, d def cubic(x1, x2, x3, x4, y1, y2, y3, y4, flag=None): """ Flag meanings: 0 --> only central points good (ignore 1 and 4) 1 --> only first three points good (ignore 4) 2 --> only last three points good (ignore 1) 3 --> all four points good (default if None) More complex cases are not yet considered. """ if flag is None: flag = np.zeros(y1.shape[0], dtype = np.int8)+3 a = np.zeros_like(y1) b = np.zeros_like(y1) c = np.zeros_like(y1) d = np.zeros_like(y1) # Great fun: full cubic polynomial for class-3 points ind_full = np.nonzero(flag == 3)[0] if len(ind_full) > 0: a[ind_full, ...] = ( (y1-y4 - (x1-x4)*(y3-y4)/(x3-x4) - (y2-y4 - (x2-x4)/(x3-x4)*(y3-y4)) * (x1**2-x4**2 - (x1-x4)*(x3**2-x4**2)/(x3-x4)) / (x2**2-x4**2-(x2-x4)*(x3**2-x4**2)/(x3-x4))) / (x1**3-x4**3 - (x1-x4)*(x3**3-x4**3)/(x3-x4) - (x2**3-x4**3-(x2-x4)*(x3**3-x4**3)/(x3-x4)) * (x1**2-x4**2 - (x1-x4)*(x3**2-x4**2)/(x3-x4)) / (x2**2-x4**2-(x2-x4)*(x3**2-x4**2)/(x3-x4))))[ind_full, ...] b[ind_full, ...] = ( ((y2-y4 - (x2-x4)/(x3-x4)*(y3-y4)) - a * (x2**3-x4**3-(x2-x4)*(x3**3-x4**3)/(x3-x4))) / (x2**2-x4**2-(x2-x4)*(x3**2-x4**2)/(x3-x4)))[ind_full, ...] c[ind_full, ...] = ( (y3-y4-a*(x3**3-x4**3)-b*(x3**2-x4**2))/(x3-x4))[ind_full, ...] d[ind_full, ...] = (y4-a*x4**3-b*x4**2-c*x4)[ind_full, ...] # Quadratic polynomial fit (a=0) for class-1 points (first 3) ind_1 = np.nonzero(flag == 1)[0] if len(ind_1) > 0: b[ind_1, ...] = ( (y1-y3+(x3-x1)/(x2-x3)*(y2-y3)) / (x1**2-x3**2-(x1-x3)*(x2+x3)))[ind_1, ...] c[ind_1, ...] = ((y2-y3)/(x2-x3) - b*(x2+x3))[ind_1, ...] d[ind_1, ...] = (y3-b*x3**2-c*x3)[ind_1, ...] # Quadratic polynomial fit (a=0) for class=2 points (last 3) ind_2 = np.nonzero(flag == 2)[0] if len(ind_2) > 0: b[ind_2, ...] = ( (y2-y4+(x4-x2)/(x3-x4)*(y3-y4)) / (x2**2-x4**2-(x2-x4)*(x3+x4)))[ind_2, ...] c[ind_2, ...] = ((y3-y4)/(x3-x4) - b*(x3+x4))[ind_2, ...] d[ind_2, ...] = (y4-b*x4**2-c*x4)[ind_2, ...] # Linear fit (a=0, b=0) for class=0 points (central 2 only) ind_0 = np.nonzero(flag == 0)[0] if len(ind_0) > 0: c[ind_0, ...] = (y2[ind_0, ...]-y3[ind_0, ...])/(x2-x3) d[ind_0, ...] = y3[ind_0, ...] - x3*c[ind_0, ...] return a, b, c, d # ================================================= # ==== Functions whose purpose/need is unclear ==== # ================================================= def binary_search(a, x, lo=0, hi=None): """Do something, exact purpose unclear.""" # can't use a to specify default for hi hi = hi if hi is not None else len(a) # hi defaults to len(a) pos = bisect_left(a,x,lo,hi) # find insertion position return (pos if pos != hi and a[pos] == x else -1) # don't walk off the end def match_keys(key1, key2,check_sort=False): if check_sort: if not is_sorted(key1): print("Input key1 needs to be sorted!") sys.exit() ind_trial = np.searchsorted(key1, key2) ind_clipped = np.nonzero(ind_trial >= len(key1))[0] ind_trial[ind_clipped] = len(key1)-1 matches = key1[ind_trial] ind_goodmatch = np.nonzero(matches == key2)[0] print("Found {:d} matching keys (= {:.2f} per cent of KEY1, {:.2f} per cent of KEY2)..." .format(len(ind_goodmatch), len(ind_goodmatch)/len(key1)*100.0, len(ind_goodmatch)/len(key2)*100.0)) return ind_trial[ind_goodmatch], ind_goodmatch def rotation_matrix(axis, theta): """ Return the rotation matrix associated with counterclockwise rotation about the given axis by theta radians. From http://stackoverflow.com/questions/6802577/python-rotation-of-3d-vector """ axis = np.asarray(axis) theta = np.asarray(theta) axis = axis/math.sqrt(np.dot(axis, axis)) a = math.cos(theta/2) b, c, d = -axis*math.sin(theta/2) aa, bb, cc, dd = a*a, b*b, c*c, d*d bc, ad, ac, ab, bd, cd = b*c, a*d, a*c, a*b, b*d, c*d return np.array([[aa+bb-cc-dd, 2*(bc+ad), 2*(bd-ac)], [2*(bc-ad), aa+cc-bb-dd, 2*(cd+ab)], [2*(bd+ac), 2*(cd-ab), aa+dd-bb-cc]]) def rotatebyaxis(r, axis, theta): """ Rotate points using matrix / axis formalism """ return np.dot(r,rotation_matrix(axis,theta*np.pi/180))