Source code for SciFiReaders.readers.microscopy.spm.afm.pifm

# -*- coding: utf-8 -*-
"""
Created on Mon Sep 13 21:46:14 2021

@author: Raj
"""

import sidpy as sid
from sidpy.sid import Reader
from sidpy.sid import Dimension

import os
import numpy as np
import h5py

from pyNSID.io.hdf_io import write_nsid_dataset
from pyNSID.io.hdf_io import create_indexed_group, write_simple_attrs

[docs]class PiFMTranslator(Reader): """ Class that writes images, spectrograms, point spectra and associated ancillary data sets to h5 file in pyUSID data structure. """
[docs] def read(self ): """ Parameters ---------- file_path : String / unicode Absolute path of the .ibw file verbose : Boolean (Optional) Whether or not to show print statements for debugging Returns ------- sidpy.Dataset : List of sidpy.Dataset objects. Image layers are saved as separate Dataset objects """ self.get_path() self.read_anfatec_params() self.read_file_desc() self.read_spectrograms() self.read_imgs() self.read_spectra() self.datasets = self.make_datasets() return self.datasets
[docs] def create_h5(self, append_path='', overwrite=False): """ Writes a new HDF5 file with the translated data append_path : string (Optional) h5_file to add these data to, must be a path to the h5_file on disk overwrite : bool (optional, default=False) If True, will overwrite an existing .h5 file of the same name """ self.create_hdf5_file(append_path, overwrite) self.write_datasets_hdf5() return
[docs] def get_path(self): """writes full path, directory, and file name as attributes to class""" self.path = self._input_file_path full_path = os.path.realpath(self.path) directory = os.path.dirname(full_path) # file name basename = os.path.basename(self.path) self.full_path = full_path self.directory = directory self.basename = basename
[docs] def read_anfatec_params(self): """reads the scan parameters and writes them to a dictionary""" params_dictionary = {} params = True with open(self.path, 'r', encoding="ISO-8859-1") as f: for line in f: if params: sline = [val.strip() for val in line.split(':')] if len(sline) == 2 and sline[0][0] != ';': params_dictionary[sline[0]] = sline[1] #in ANFATEC parameter files, all attributes are written before file references. if sline[0].startswith('FileDesc'): params = False f.close() self.params_dictionary = params_dictionary self.x_len, self.y_len = int(params_dictionary['xPixel']), int(params_dictionary['yPixel'])
[docs] def read_file_desc(self): """reads spectrogram, image, and spectra file descriptions and stores all to dictionary where the key:value pairs are filename:[all descriptors]""" spectrogram_desc = {} img_desc = {} spectrum_desc = {} pspectrum_desc = {} with open(self.path,'r', encoding="ISO-8859-1") as f: lines = f.readlines() for index, line in enumerate(lines): sline = [val.strip() for val in line.split(':')] #if true, then file describes image. if sline[0].startswith('FileDescBegin'): no_descriptors = 5 file_desc = [] for i in range(no_descriptors): line_desc = [val.strip() for val in lines[index+i+1].split(':')] file_desc.append(line_desc[1]) #img_desc['filename'] = caption, scale, physical unit, offset img_desc[file_desc[0]] = file_desc[1:] #if true, file describes spectrogram (ie hyperspectral image) if sline[0].startswith('FileDesc2Begin'): no_descriptors = 10 file_desc = [] for i in range(no_descriptors): line_desc = [val.strip() for val in lines[index+i+1].split(':')] file_desc.append(line_desc[1]) #caption, bytes perpixel, scale, physical unit, offset, offset, datatype, bytes per reading #filename wavelengths, phys units wavelengths. spectrogram_desc[file_desc[0]] = file_desc[1:] if sline[0].startswith('AFMSpectrumDescBegin'): file_desc = [] line_desc = [val.strip() for val in lines[index+1].split(':')][1] if 'powerspectrum' in line_desc: no_descriptors = 2 for i in range(no_descriptors): line_desc = [val.strip() for val in lines[index+i+1].split(':')] file_desc.append(line_desc[1]) #file name, position x, position y pspectrum_desc[file_desc[0]] = file_desc[1:] else: no_descriptors = 7 for i in range(no_descriptors): line_desc = [val.strip() for val in lines[index+i+1].split(':')] file_desc.append(line_desc[1]) #file name, position x, position y spectrum_desc[file_desc[0]] = file_desc[1:] f.close() self.img_desc = img_desc self.spectrogram_desc = spectrogram_desc self.spectrum_desc = spectrum_desc self.pspectrum_desc = pspectrum_desc
[docs] def read_spectrograms(self): """reads spectrograms, associated spectral values, and saves them in two dictionaries""" spectrograms = {} spectrogram_spec_vals = {} for file_name, descriptors in self.spectrogram_desc.items(): spec_vals_i = np.loadtxt(os.path.join(self.directory, file_name.strip('.int') + 'Wavelengths.txt')) #if true, data is acquired with polarizer, with an attenuation data column if np.array(spec_vals_i).ndim == 2: spectrogram_spec_vals[file_name] = spec_vals_i[:, 0] attenuation = {} attenuation[file_name] = spec_vals_i[:, 1] self.attenuation = attenuation else: spectrogram_spec_vals[file_name] = spec_vals_i #load and save spectrograms spectrogram_i = np.fromfile(os.path.join(self.directory, file_name), dtype='i4') spectrograms[file_name] = np.zeros((self.x_len, self.y_len, len(spec_vals_i))) for y, line in enumerate(np.split(spectrogram_i, self.y_len)): for x, pt_spectrum in enumerate(np.split(line, self.x_len)): spectrograms[file_name][x, y, :] = pt_spectrum * float(descriptors[2]) self.spectrograms = spectrograms self.spectrogram_spec_vals = spectrogram_spec_vals
[docs] def read_imgs(self): """reads images and saves to dictionary""" imgs = {} for file_name, descriptors in self.img_desc.items(): img_i = np.fromfile(os.path.join(self.directory, file_name), dtype='i4') imgs[file_name] = np.zeros((self.x_len, self.y_len)) for y, line in enumerate(np.split(img_i, self.y_len)): for x, pixel in enumerate(np.split(line, self.x_len)): imgs[file_name][x, y] = pixel * float(descriptors[1]) self.imgs = imgs
[docs] def read_spectra(self): """reads all point spectra and saves to dictionary""" spectra = {} spectra_spec_vals = {} spectra_x_y_dim_name = {} for file_name, descriptors in self.spectrum_desc.items(): spectrum_f = np.loadtxt(os.path.join(self.directory, file_name), skiprows=1) spectra_spec_vals[file_name] = spectrum_f[:, 0] spectra[file_name] = spectrum_f[:,1] with open(os.path.join(self.directory, file_name)) as f: spectra_x_y_dim_name[file_name] = f.readline().strip('\n').split('\t') for file_name, descriptors in self.pspectrum_desc.items(): spectrum_f = np.loadtxt(os.path.join(self.directory, file_name), skiprows=1) spectra_spec_vals[file_name] = spectrum_f[:, 0] spectra[file_name] = spectrum_f[:,1] with open(os.path.join(self.directory, file_name)) as f: spectra_x_y_dim_name[file_name] = f.readline().strip('\n').split('\t') self.spectra = spectra self.spectra_spec_vals = spectra_spec_vals self.spectra_x_y_dim_name = spectra_x_y_dim_name
def make_datasets(self): datasets = [] self.make_dimensions() # Spectrograms if bool(self.spectrogram_desc): for spectrogram_f, descriptors in self.spectrogram_desc.items(): # channel_i = create_indexed_group(self.h5_meas_grp, 'Channel_') spec_vals_i = self.spectrogram_spec_vals[spectrogram_f] spectrogram_data = self.spectrograms[spectrogram_f] dset = sid.Dataset.from_array(spectrogram_data, name=descriptors[0]) dset.data_type = 'Spectrogram' dset.set_dimension(0, self.dim0) dset.set_dimension(1, self.dim0) # spectrogram_spec_dims = Dimension('Wavelength', descriptors[8], spec_vals_i) spectrogram_dims = Dimension(values=spec_vals_i, name='Spectrogram', units=descriptors[3], quantity='Wavelength', type='spectral' ) dset.set_dimension(2, spectrogram_dims) dset.metadata = {'Caption': descriptors[0], 'Bytes_Per_Pixel': descriptors[1], 'Scale': descriptors[2], 'Physical_Units': descriptors[3], 'Offset': descriptors[4], 'Datatype': descriptors[5], 'Bytes_Per_Reading': descriptors[6], 'Wavelength_File': descriptors[7], 'Wavelength_Units': descriptors[8]} datasets.append(dset) # Images if bool(self.img_desc): for img_f, descriptors in self.img_desc.items(): img_data = self.imgs[img_f] dset = sid.Dataset.from_array(img_data, name = descriptors[0]) dset.data_type = 'Image' dset.set_dimension(0, self.dim0) dset.set_dimension(1, self.dim1) dset.units = descriptors[2] dset.quantity = descriptors[0] dset.metadata = {'Caption': descriptors[0], 'Scale': descriptors[1], 'Physical_Units': descriptors[2], 'Offset': descriptors[3]} datasets.append(dset) # Spectra if bool(self.spectrum_desc): for spec_f, descriptors in self.spectrum_desc.items(): #create new measurement group for each spectrum x_name = self.spectra_x_y_dim_name[spec_f][0].split(' ')[0] x_unit = self.spectra_x_y_dim_name[spec_f][0].split(' ')[1] y_name = self.spectra_x_y_dim_name[spec_f][1].split(' ')[0] y_unit = self.spectra_x_y_dim_name[spec_f][1].split(' ')[1] dset = sid.Dataset.from_array(self.spectra[spec_f], name = 'Raw_Spectrum') dset.set_dimension(0, Dimension(np.array([float(descriptors[1])]), name='X',units=self.params_dictionary['XPhysUnit'].replace('\xb5','u'), quantity = 'X_position')) dset.set_dimension(1, Dimension(np.array([float(descriptors[2])]), name='Y',units=self.params_dictionary['YPhysUnit'].replace('\xb5','u'), quantity = 'Y_position')) dset.data_type = 'Spectrum' dset.units = y_unit dset.quantity = y_name spectra_dims = Dimension(values=self.spectra_spec_vals[spec_f], name='Wavelength', units=x_unit, quantity=x_name, type='spectral' ) dset.set_dimension(2, spectra_dims) dset.metadata = {'XLoc': descriptors[1], 'YLoc': descriptors[2]} datasets.append(dset) # Power Spectra if bool(self.pspectrum_desc): for spec_f, descriptors in self.pspectrum_desc.items(): #create new measurement group for each spectrum x_name = self.spectra_x_y_dim_name[spec_f][0].split(' ')[0] x_unit = self.spectra_x_y_dim_name[spec_f][0].split(' ')[1] y_name = self.spectra_x_y_dim_name[spec_f][1].split(' ')[0] y_unit = self.spectra_x_y_dim_name[spec_f][1].split(' ')[1] dset = sid.Dataset.from_array(self.spectra[spec_f], name = 'Power_Spectrum') dset.set_dimension(0, Dimension(np.array([0]), name='X',units=self.params_dictionary['XPhysUnit'].replace('\xb5','u'), quantity = 'X_position')) dset.set_dimension(1, Dimension(np.array([0]), name='Y',units=self.params_dictionary['YPhysUnit'].replace('\xb5','u'), quantity = 'Y_position')) dset.data_type = 'Spectrum' dset.units = y_unit dset.quantity = y_name spectra_dims = Dimension(values=self.spectra_spec_vals[spec_f], name='Wavelength', units=x_unit, quantity=x_name, type='spectral' ) dset.set_dimension(2, spectra_dims) dset.metadata = {'XLoc': 0, 'YLoc': 0} datasets.append(dset) return datasets def make_dimensions(self): x_range = float(self.params_dictionary['XScanRange']) y_range = float(self.params_dictionary['YScanRange']) x_center = float(self.params_dictionary['xCenter']) y_center = float(self.params_dictionary['yCenter']) x_start = x_center-(x_range/2); x_end = x_center+(x_range/2) y_start = y_center-(y_range/2); y_end = y_center+(y_range/2) dx = x_range/self.x_len dy = y_range/self.y_len #assumes y scan direction:down; scan angle: 0 deg y_linspace = -np.arange(y_start, y_end, step=dy) x_linspace = np.arange(x_start, x_end, step=dx) qtyx = self.params_dictionary['XPhysUnit'].replace('\xb5', 'u') qtyy = self.params_dictionary['YPhysUnit'].replace('\xb5', 'u') self.dim0 = Dimension(x_linspace, name = 'x', units = qtyx, dimension_type = 'spatial', quantity='Length') self.dim1 = Dimension(y_linspace, name = 'y', units = qtyy, dimension_type = 'spatial', quantity='Length') # self.pos_ind, self.pos_val, self.pos_dims = pos_ind, pos_val, pos_dims return # HDF5 creation
[docs] def create_hdf5_file(self, append_path='', overwrite=False): """ Sets up the HDF5 file for writing append_path : string (Optional) h5_file to add these data to, must be a path to the h5_file on disk overwrite : bool (optional, default=False) If True, will overwrite an existing .h5 file of the same name """ if not append_path: h5_path = os.path.join(self.directory, self.basename.replace('.txt', '.h5')) if os.path.exists(h5_path): if not overwrite: raise FileExistsError('This file already exists). Set attribute overwrite to True') else: print('Overwriting file', h5_path) #os.remove(h5_path) self.h5_f = h5py.File(h5_path, mode='w') else: if not os.path.exists(append_path): raise Exception('File does not exist. Check pathname.') self.h5_f = h5py.File(append_path, mode='r+') self.h5_img_grp = create_indexed_group(self.h5_f, "Images") self.h5_spectra_grp = create_indexed_group(self.h5_f, "Spectra") self.h5_spectrogram_grp = create_indexed_group(self.h5_f, "Spectrogram") write_simple_attrs(self.h5_img_grp, self.params_dictionary) write_simple_attrs(self.h5_spectra_grp, self.params_dictionary) write_simple_attrs(self.h5_spectrogram_grp, self.params_dictionary) return
[docs] def write_datasets_hdf5(self): """ Writes the datasets as pyNSID datasets to the HDF5 file""" for dset in self.datasets: if 'IMAGE' in dset.data_type.name: write_nsid_dataset(dset, self.h5_img_grp) elif 'SPECTRUM' in dset.data_type.name: write_nsid_dataset(dset, self.h5_spectra_grp) else: write_nsid_dataset(dset, self.h5_spectrogram_grp) self.h5_f.file.close() return