#!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
################################################################################
# Python class for reading Nion Swift files into sidpy Dataset
# and extracting all metadata
#
# Written by Gerd Duscher, UTK 2020
#
# Works for python 3
#
################################################################################
from __future__ import division, print_function, absolute_import, unicode_literals
import json
import struct
import h5py
# from warnings import warn
import sys
import numpy as np
import os
import sidpy
__all__ = ["NionReader", "version"]
version = '0.1beta'
debugLevel = 0 # 0=none, 1-3=basic, 4-5=simple, 6-10 verbose
if sys.version_info.major == 3:
unicode = str
# ### utility functions ###
[docs]def parse_zip(fp):
"""
Parse the zip file headers at fp
:param fp: the file pointer from which to parse the zip file
:return: A tuple of local files, directory headers, and end of central directory
The local files are dictionary where the keys are the local file offset and the
values are each a tuple consisting of the name, data position, data length, and crc32.
The directory headers are a dictionary where the keys are the names of the files
and the values are a tuple consisting of the directory header position, and the
associated local file position.
The end of central directory is a tuple consisting of the location of the end of
central directory header and the location of the first directory header.
This method will seek to location 0 of fp and leave fp at end of file.
This function is copied from nionswift/nion/swift/model/NDataHandler.py
"""
local_files = {}
dir_files = {}
eocd = None
fp.seek(0)
while True:
pos = fp.tell()
signature = struct.unpack('I', fp.read(4))[0]
if signature == 0x04034b50:
fp.seek(pos + 14)
crc32 = struct.unpack('I', fp.read(4))[0]
fp.seek(pos + 18)
data_len = struct.unpack('I', fp.read(4))[0]
fp.seek(pos + 26)
name_len = struct.unpack('H', fp.read(2))[0]
extra_len = struct.unpack('H', fp.read(2))[0]
name_bytes = fp.read(name_len)
fp.seek(extra_len, os.SEEK_CUR)
data_pos = fp.tell()
fp.seek(data_len, os.SEEK_CUR)
local_files[pos] = (name_bytes, data_pos, data_len, crc32)
elif signature == 0x02014b50:
fp.seek(pos + 28)
name_len = struct.unpack('H', fp.read(2))[0]
extra_len = struct.unpack('H', fp.read(2))[0]
comment_len = struct.unpack('H', fp.read(2))[0]
fp.seek(pos + 42)
pos2 = struct.unpack('I', fp.read(4))[0]
name_bytes = fp.read(name_len)
fp.seek(pos + 46 + name_len + extra_len + comment_len)
dir_files[name_bytes] = (pos, pos2)
elif signature == 0x06054b50:
fp.seek(pos + 16)
pos2 = struct.unpack('I', fp.read(4))[0]
eocd = (pos, pos2)
break
else:
raise IOError()
return local_files, dir_files, eocd
[docs]class NionReader(sidpy.Reader):
def __init__(self, file_path, verbose=False):
"""
file_path: filepath to dm3 file.
"""
super().__init__(file_path)
# initialize variables ##
self.verbose = verbose
self.__filename = file_path
path, file_name = os.path.split(self.__filename)
self.basename, self.extension = os.path.splitext(file_name)
self.data_cube = None
self.original_metadata = {}
self.dimensions = []
if 'ndata' in self.extension:
# - open file for reading
try:
self.__f = open(self.__filename, "rb")
except FileNotFoundError:
raise FileNotFoundError('File not found')
try:
local_files, dir_files, eocd = parse_zip(self.__f)
except IOError:
raise IOError("File {} does not seem to be of Nion`s .ndata format".format(self.__filename))
self.__f.close()
elif self.extension == '.h5':
try:
fp = h5py.File(self.__filename, mode='a')
if 'data' not in fp:
raise IOError("File {} does not seem to be of Nion`s .h5 format".format(self.__filename))
fp.close()
except IOError:
raise IOError("File {} does not seem to be of Nion`s .h5 format".format(self.__filename))
[docs] def read(self):
if 'ndata' in self.extension:
try:
self.__f = open(self.__filename, "rb")
except FileNotFoundError:
raise FileNotFoundError('File not found')
local_files, dir_files, eocd = parse_zip(self.__f)
contains_data = b"data.npy" in dir_files
contains_metadata = b"metadata.json" in dir_files
file_count = contains_data + contains_metadata # use fact that True is 1, False is 0
self.__f.seek(local_files[dir_files[b"data.npy"][1]][1])
self.data_cube = np.load(self.__f)
json_pos = local_files[dir_files[b"metadata.json"][1]][1]
json_len = local_files[dir_files[b"metadata.json"][1]][2]
self.__f.seek(json_pos)
json_properties = self.__f.read(json_len)
self.original_metadata = json.loads(json_properties.decode("utf-8"))
self.__f.close()
elif self.extension == '.h5':
# TODO: use lazy load for large datasets
self.__f = h5py.File(self.__filename, 'a')
if 'data' in self.__f:
json_properties = self.__f['data'].attrs.get("properties", "")
self.data_cube = self.__f['data'][:]
self.original_metadata = json.loads(json_properties)
self.__f.close()
self.get_dimensions()
# Need to switch image dimensions in Nion format
image_dims = []
spectral_dims = []
for dim, axis in enumerate(self.dimensions):
# print(dim, axis)
if axis.dimension_type == sidpy.DimensionType.SPATIAL:
image_dims.append(dim)
if axis.dimension_type == sidpy.DimensionType.SPECTRAL:
spectral_dims.append(dim)
# convert line-scan nxN to spectral_image 1xnxN
if len(image_dims) == 1:
if self.data_cube.ndim > 1:
self.data_cube = self.data_cube.reshape(1, self.data_cube.shape[0], self.data_cube.shape[1])
new_dims = [sidpy.Dimension([1], name='x', units='pixels',
quantity='distance', dimension_type='spatial'),
sidpy.Dimension(np.arange(self.data_cube.shape[0]), name='y', units='pixels',
quantity='distance', dimension_type='spatial'),
self.dimensions[spectral_dims[0]]]
self.dimensions = new_dims
if len(image_dims) == 2:
self.data_cube = np.swapaxes(self.data_cube, image_dims[0], image_dims[1])
temp = self.dimensions[image_dims[0]].copy()
self.dimensions[image_dims[0]] = self.dimensions[image_dims[1]].copy()
self.dimensions[image_dims[1]] = temp
dataset = sidpy.Dataset.from_array(self.data_cube)
dim_names = []
for dim, axis in enumerate(self.dimensions):
dataset.set_dimension(dim, axis)
dataset.original_metadata = self.original_metadata
if 'dimensional_calibrations' in dataset.original_metadata:
for dim in dataset.original_metadata['dimensional_calibrations']:
if dim['units'] == '':
dim['units'] = 'pixels'
dataset.quantity = 'intensity'
dataset.units = 'counts'
if 'description' in dataset.original_metadata:
dataset.title = dataset.original_metadata['description']['title']
else:
if 'title' in dataset.original_metadata:
dataset.title = dataset.original_metadata['title']
else:
path, file_name = os.path.split(self.__filename)
basename, extension = os.path.splitext(file_name)
dataset.title = basename
if 'data_source' in dataset.original_metadata:
dataset.source = dataset.original_metadata['data_source']
else:
dataset.source = 'NionReader'
self.set_data_type(dataset)
dataset.modality = 'STEM data'
dataset.h5_dataset = None
return dataset
def set_data_type(self, dataset):
spectral_dim = False
for axis in dataset._axes.values():
if axis.dimension_type == sidpy.DimensionType.SPECTRAL:
spectral_dim = True
if len(dataset.shape) > 3:
raise NotImplementedError('Data_type not implemented yet')
elif len(dataset.shape) == 3:
if spectral_dim:
dataset.data_type = 'spectral_image'
else:
dataset.data_type = 'IMAGE_STACK'
for dim, axis in dataset._axes.items():
if axis.dimension_type != sidpy.DimensionType.SPATIAL:
dataset.set_dimension(dim, sidpy.Dimension(axis.values,
name='frame',
units='frame',
quantity='stack',
dimension_type=sidpy.DimensionType.TEMPORAL))
break
elif len(dataset.shape) == 2:
if spectral_dim:
dataset.data_type = sidpy.DataType.SPECTRAL_IMAGE
else:
dataset.data_type = sidpy.DataType.IMAGE
elif len(dataset.shape) == 1:
if spectral_dim:
dataset.data_type = sidpy.DataType.SPECTRUM
else:
dataset.data_type = sidpy.DataType.LINE_PLOT
def get_dimensions(self):
dic = self.original_metadata
reciprocal_name = 'u'
spatial_name = 'x'
if 'dimensional_calibrations' in dic:
dimension_list = dic['dimensional_calibrations']
elif 'spatial_calibrations' in dic:
dimension_list = dic['spatial_calibrations']
else:
return
for dim in range(len(dimension_list)):
dimension_tags = dimension_list[dim]
units = dimension_tags['units']
values = (np.arange(self.data_cube.shape[int(dim)])-dimension_tags['offset']) * dimension_tags['scale']
if 'eV' == units:
self.dimensions.append(sidpy.Dimension(values, name='energy_loss', units=units,
quantity='energy-loss', dimension_type='spectral'))
elif 'eV' in units:
self.dimensions.append(sidpy.Dimension(values, name='energy', units=units,
quantity='energy', dimension_type='spectral'))
elif '1/' in units or units in ['mrad', 'rad']:
self.dimensions.append(sidpy.Dimension(values, name=reciprocal_name, units=units,
quantity='reciprocal distance',
dimension_type='reciprocal'))
reciprocal_name = chr(ord(reciprocal_name) + 1)
elif 'nm' in units:
self.dimensions.append(sidpy.Dimension(values, name=spatial_name, units=units,
quantity='distance', dimension_type='spatial'))
spatial_name = chr(ord(spatial_name) + 1)
else:
self.dimensions.append(sidpy.Dimension(values, name='generic_{}'.format(dim), units='generic',
quantity='generic', dimension_type='UNKNOWN'))
def get_filename(self):
return self.__filename
filename = property(get_filename)
def get_raw(self):
return self.data
data = property(get_raw)
def get_tags(self):
return self.original_metadata
tags = property(get_tags)