# -*- coding: utf-8 -*-
"""
dicom2nifti
@author: abrys
"""
from __future__ import print_function
import dicom2nifti.patch_pydicom_encodings
dicom2nifti.patch_pydicom_encodings.apply()
import os
import traceback
import logging
import nibabel
import numpy
import pydicom.config as pydicom_config
from pydicom.tag import Tag
import dicom2nifti.common as common
import dicom2nifti.settings as settings
import dicom2nifti.convert_generic as convert_generic
from dicom2nifti.exceptions import ConversionError
pydicom_config.enforce_valid_values = False
logger = logging.getLogger(__name__)
[docs]def dicom_to_nifti(dicom_input, output_file=None):
"""
This is the main dicom to nifti conversion fuction for philips images.
As input philips images are required. It will then determine the type of images and do the correct conversion
Examples: See unit test
:param output_file: file path to the output nifti
:param dicom_input: directory with dicom files for 1 scan
"""
assert common.is_philips(dicom_input)
if common.is_multiframe_dicom(dicom_input):
_assert_explicit_vr(dicom_input)
logger.info('Found multiframe dicom')
if _is_multiframe_4d(dicom_input):
logger.info('Found sequence type: MULTIFRAME 4D')
return _multiframe_to_nifti(dicom_input, output_file)
if _is_multiframe_anatomical(dicom_input):
logger.info('Found sequence type: MULTIFRAME ANATOMICAL')
return _multiframe_to_nifti(dicom_input, output_file)
else:
logger.info('Found singleframe dicom')
grouped_dicoms = _get_grouped_dicoms(dicom_input)
if _is_singleframe_4d(dicom_input):
logger.info('Found sequence type: SINGLEFRAME 4D')
return _singleframe_to_nifti(grouped_dicoms, output_file)
logger.info('Assuming anatomical data')
return convert_generic.dicom_to_nifti(dicom_input, output_file)
def _assert_explicit_vr(dicom_input):
"""
Assert that explicit vr is used
"""
if settings.validate_multiframe_implicit:
header = dicom_input[0]
if header.file_meta[0x0002, 0x0010].value == '1.2.840.10008.1.2':
raise ConversionError('IMPLICIT_VR_ENHANCED_DICOM')
def _is_multiframe_diffusion_imaging(dicom_input):
"""
Use this function to detect if a dicom series is a philips multiframe dti dataset
NOTE: We already assue this is a 4D dataset as input
"""
header = dicom_input[0]
if "PerFrameFunctionalGroupsSequence" not in header:
return False
# check if there is diffusion info in the frame
found_diffusion = False
diffusion_tag = Tag(0x0018, 0x9117)
for frame in header.PerFrameFunctionalGroupsSequence:
if diffusion_tag in frame:
found_diffusion = True
break
if not found_diffusion:
return False
return True
def _is_multiframe_4d(dicom_input):
"""
Use this function to detect if a dicom series is a philips multiframe 4D dataset
"""
# check if it is multi frame dicom
if not common.is_multiframe_dicom(dicom_input):
return False
header = dicom_input[0]
# check if there are multiple stacks
number_of_stack_slices = common.get_ss_value(header[Tag(0x2001, 0x105f)][0][Tag(0x2001, 0x102d)])
number_of_stacks = int(int(header.NumberOfFrames) / number_of_stack_slices)
if number_of_stacks <= 1:
return False
return True
def _is_multiframe_anatomical(dicom_input):
"""
Use this function to detect if a dicom series is a philips multiframe anatomical dataset
NOTE: Only the first slice will be checked so you can only provide an already sorted dicom directory
(containing one series)
"""
# check if it is multi frame dicom
if not common.is_multiframe_dicom(dicom_input):
return False
header = dicom_input[0]
# check if there are multiple stacks
number_of_stack_slices = common.get_ss_value(header[Tag(0x2001, 0x105f)][0][Tag(0x2001, 0x102d)])
number_of_stacks = int(int(header.NumberOfFrames) / number_of_stack_slices)
if number_of_stacks > 1:
return False
return True
def _is_singleframe_4d(dicom_input):
"""
Use this function to detect if a dicom series is a philips singleframe 4D dataset
"""
header = dicom_input[0]
# check if there are stack information
slice_number_mr_tag = Tag(0x2001, 0x100a)
if slice_number_mr_tag not in header:
return False
# check if there are multiple timepoints
grouped_dicoms = _get_grouped_dicoms(dicom_input)
if len(grouped_dicoms) <= 1:
return False
return True
def _is_singleframe_diffusion_imaging(grouped_dicoms):
"""
Use this function to detect if a dicom series is a philips singleframe dti dataset
NOTE: We already assume singleframe 4D input
"""
# check that there is bval information
if _is_bval_type_b(grouped_dicoms):
return True
if _is_bval_type_a(grouped_dicoms):
return True
return False
def _is_bval_type_a(grouped_dicoms):
"""
Check if the bvals are stored in the first of 2 currently known ways for single frame dti
"""
bval_tag = Tag(0x2001, 0x1003)
bvec_x_tag = Tag(0x2005, 0x10b0)
bvec_y_tag = Tag(0x2005, 0x10b1)
bvec_z_tag = Tag(0x2005, 0x10b2)
for group in grouped_dicoms:
if bvec_x_tag in group[0] and _is_float(common.get_fl_value(group[0][bvec_x_tag])) and \
bvec_y_tag in group[0] and _is_float(common.get_fl_value(group[0][bvec_y_tag])) and \
bvec_z_tag in group[0] and _is_float(common.get_fl_value(group[0][bvec_z_tag])) and \
bval_tag in group[0] and _is_float(common.get_fl_value(group[0][bval_tag])) and \
common.get_fl_value(group[0][bval_tag]) != 0:
return True
return False
def _is_bval_type_b(grouped_dicoms):
"""
Check if the bvals are stored in the second of 2 currently known ways for single frame dti
"""
bval_tag = Tag(0x0018, 0x9087)
bvec_tag = Tag(0x0018, 0x9089)
for group in grouped_dicoms:
if bvec_tag in group[0] and bval_tag in group[0]:
bvec = common.get_fd_array_value(group[0][bvec_tag], 3)
bval = common.get_fd_value(group[0][bval_tag])
if _is_float(bvec[0]) and _is_float(bvec[1]) and _is_float(bvec[2]) and _is_float(bval) and bval != 0:
return True
return False
def _is_float(value):
"""
Check if float
"""
try:
float(value)
return True
except ValueError:
return False
def _multiframe_to_nifti(dicom_input, output_file):
"""
This function will convert philips 4D or anatomical multiframe series to a nifti
"""
# Read the multiframe dicom file
logger.info('Read dicom file')
multiframe_dicom = dicom_input[0]
# Create mosaic block
logger.info('Creating data block')
full_block = _multiframe_to_block(multiframe_dicom)
logger.info('Creating affine')
# Create the nifti header info
affine = _create_affine_multiframe(multiframe_dicom)
logger.info('Creating nifti')
# Convert to nifti
nii_image = nibabel.Nifti1Image(full_block, affine)
timing_parameters = multiframe_dicom.SharedFunctionalGroupsSequence[0].MRTimingAndRelatedParametersSequence[0]
first_frame = multiframe_dicom[Tag(0x5200, 0x9230)][0]
common.set_tr_te(nii_image, float(timing_parameters.RepetitionTime),
float(first_frame[0x2005, 0x140f][0].EchoTime))
# Save to disk
if output_file is not None:
logger.info('Saving nifti to disk %s' % output_file)
nii_image.to_filename(output_file)
if _is_multiframe_diffusion_imaging(dicom_input):
bval_file = None
bvec_file = None
if output_file is not None:
# Create the bval en bvec files
base_path = os.path.dirname(output_file)
base_name = os.path.splitext(os.path.splitext(os.path.basename(output_file))[0])[0]
logger.info('Creating bval en bvec files')
bval_file = '%s/%s.bval' % (base_path, base_name)
bvec_file = '%s/%s.bvec' % (base_path, base_name)
bval, bvec, bval_file, bvec_file = _create_bvals_bvecs(multiframe_dicom, bval_file, bvec_file, nii_image,
output_file)
return {'NII_FILE': output_file,
'BVAL_FILE': bval_file,
'BVEC_FILE': bvec_file,
'NII': nii_image,
'BVAL': bval,
'BVEC': bvec}
return {'NII_FILE': output_file,
'NII': nii_image}
def _singleframe_to_nifti(grouped_dicoms, output_file):
"""
This function will convert a philips singleframe series to a nifti
"""
# Create mosaic block
logger.info('Creating data block')
full_block = _singleframe_to_block(grouped_dicoms)
logger.info('Creating affine')
# Create the nifti header info
affine = common.create_affine(grouped_dicoms[0])
logger.info('Creating nifti')
# Convert to nifti
nii_image = nibabel.Nifti1Image(full_block, affine)
common.set_tr_te(nii_image, float(grouped_dicoms[0][0].RepetitionTime), float(grouped_dicoms[0][0].EchoTime))
if output_file is not None:
# Save to disk
logger.info('Saving nifti to disk %s' % output_file)
nii_image.to_filename(output_file)
if _is_singleframe_diffusion_imaging(grouped_dicoms):
bval_file = None
bvec_file = None
# Create the bval en bvec files
if output_file is not None:
base_name = os.path.splitext(output_file)[0]
if base_name.endswith('.nii'):
base_name = os.path.splitext(base_name)[0]
logger.info('Creating bval en bvec files')
bval_file = '%s.bval' % base_name
bvec_file = '%s.bvec' % base_name
nii_image, bval, bvec, bval_file, bvec_file = _create_singleframe_bvals_bvecs(grouped_dicoms,
bval_file,
bvec_file,
nii_image,
output_file)
return {'NII_FILE': output_file,
'BVAL_FILE': bval_file,
'BVEC_FILE': bvec_file,
'NII': nii_image,
'BVAL': bval,
'BVEC': bvec}
return {'NII_FILE': output_file,
'NII': nii_image}
def _singleframe_to_block(grouped_dicoms):
"""
Generate a full datablock containing all timepoints
"""
# For each slice / mosaic create a data volume block
data_blocks = []
for index in range(0, len(grouped_dicoms)):
logger.info('Creating block %s of %s' % (index + 1, len(grouped_dicoms)))
current_block = _stack_to_block(grouped_dicoms[index])
current_block = current_block[:, :, :, numpy.newaxis]
data_blocks.append(current_block)
try:
full_block = numpy.concatenate(data_blocks, axis=3)
except:
traceback.print_exc()
raise ConversionError("MISSING_DICOM_FILES")
# Apply the rescaling if needed
common.apply_scaling(full_block, grouped_dicoms[0][0])
return full_block
def _stack_to_block(timepoint_dicoms):
"""
Convert a mosaic slice to a block of data by reading the headers, splitting the mosaic and appending
"""
return common.get_volume_pixeldata(timepoint_dicoms)
def _get_grouped_dicoms(dicom_input):
"""
Search all dicoms in the dicom directory, sort and validate them
fast_read = True will only read the headers not the data
"""
# if all dicoms have an instance number try sorting by instance number else by position
if [d for d in dicom_input if 'InstanceNumber' in d]:
dicoms = sorted(dicom_input, key=lambda x: x.InstanceNumber)
else:
dicoms = common.sort_dicoms(dicom_input)
# now group per stack
grouped_dicoms = [[]] # list with first element a list
timepoint_index = 0
previous_stack_position = -1
# loop over all sorted dicoms
stack_position_tag = Tag(0x2001, 0x100a) # put this there as this is a slow step and used a lot
for index in range(0, len(dicoms)):
dicom_ = dicoms[index]
stack_position = 0
if stack_position_tag in dicom_:
stack_position = common.get_is_value(dicom_[stack_position_tag])
if previous_stack_position == stack_position:
# if the stack number is the same we move to the next timepoint
timepoint_index += 1
if len(grouped_dicoms) <= timepoint_index:
grouped_dicoms.append([])
else:
# if it changes move back to the first timepoint
timepoint_index = 0
grouped_dicoms[timepoint_index].append(dicom_)
previous_stack_position = stack_position
return grouped_dicoms
def _create_affine_multiframe(multiframe_dicom):
"""
Function to create the affine matrix for a siemens mosaic dataset
This will work for siemens dti and 4D if in mosaic format
"""
first_frame = multiframe_dicom[Tag(0x5200, 0x9230)][0]
# Create affine matrix (http://nipy.sourceforge.net/nibabel/dicom/dicom_orientation.html#dicom-slice-affine)
image_orient1 = numpy.array(first_frame.PlaneOrientationSequence[0].ImageOrientationPatient)[0:3].astype(float)
image_orient2 = numpy.array(first_frame.PlaneOrientationSequence[0].ImageOrientationPatient)[3:6].astype(float)
normal = numpy.cross(image_orient1, image_orient2)
delta_r = float(first_frame[0x2005, 0x140f][0].PixelSpacing[0])
delta_c = float(first_frame[0x2005, 0x140f][0].PixelSpacing[1])
image_pos = numpy.array(first_frame.PlanePositionSequence[0].ImagePositionPatient).astype(float)
delta_s = float(multiframe_dicom.SpacingBetweenSlices)
return numpy.matrix(
[[-image_orient1[0] * delta_c, -image_orient2[0] * delta_r, -delta_s * normal[0], -image_pos[0]],
[-image_orient1[1] * delta_c, -image_orient2[1] * delta_r, -delta_s * normal[1], -image_pos[1]],
[image_orient1[2] * delta_c, image_orient2[2] * delta_r, delta_s * normal[2], image_pos[2]],
[0, 0, 0, 1]])
def _multiframe_to_block(multiframe_dicom):
"""
Generate a full datablock containing all stacks
"""
# Calculate the amount of stacks and slices in the stack
number_of_stack_slices = int(common.get_ss_value(multiframe_dicom[Tag(0x2001, 0x105f)][0][Tag(0x2001, 0x102d)]))
number_of_stacks = int(int(multiframe_dicom.NumberOfFrames) / number_of_stack_slices)
# We create a numpy array
size_x = multiframe_dicom.pixel_array.shape[2]
size_y = multiframe_dicom.pixel_array.shape[1]
size_z = number_of_stack_slices
size_t = number_of_stacks
# get the format
format_string = common.get_numpy_type(multiframe_dicom)
# get header info needed for ordering
frame_info = multiframe_dicom[0x5200, 0x9230]
data_4d = numpy.zeros((size_z, size_y, size_x, size_t), dtype=format_string)
# loop over each slice and insert in datablock
t_location_index = _get_t_position_index(multiframe_dicom)
for slice_index in range(0, size_t * size_z):
z_location = frame_info[slice_index].FrameContentSequence[0].InStackPositionNumber - 1
if t_location_index is None:
t_location = frame_info[slice_index].FrameContentSequence[0].TemporalPositionIndex - 1
else:
t_location = frame_info[slice_index].FrameContentSequence[0].DimensionIndexValues[t_location_index] - 1
block_data = multiframe_dicom.pixel_array[slice_index, :, :]
# apply scaling
rescale_intercept = frame_info[slice_index].PixelValueTransformationSequence[0].RescaleIntercept
rescale_slope = frame_info[slice_index].PixelValueTransformationSequence[0].RescaleSlope
private_scale_slope = 1.0
private_scale_intercept = 0.0
private_sequence_tag = Tag(0x2005, 0x140f)
private_scale_intercept_tag = Tag(0x2005, 0x100d)
private_scale_slope_tag = Tag(0x2005, 0x100e)
if private_sequence_tag in frame_info[slice_index]:
if private_scale_intercept_tag in frame_info[slice_index][private_sequence_tag][0]:
private_scale_intercept = common.get_fl_value(frame_info[slice_index][private_sequence_tag][0][
private_scale_intercept_tag])
if private_scale_slope_tag in frame_info[slice_index][private_sequence_tag][0]:
private_scale_slope = common.get_fl_value(
frame_info[slice_index][private_sequence_tag][0][private_scale_slope_tag])
block_data = common.do_scaling(block_data,
rescale_slope, rescale_intercept,
private_scale_slope, private_scale_intercept)
# switch to float if needed
if block_data.dtype != data_4d.dtype:
data_4d = data_4d.astype(block_data.dtype)
data_4d[z_location, :, :, t_location] = block_data
full_block = numpy.zeros((size_x, size_y, size_z, size_t), dtype=data_4d.dtype)
# loop over each stack and reorganize the data
for t_index in range(0, size_t):
# transpose the block so the directions are correct
data_3d = numpy.transpose(data_4d[:, :, :, t_index], (2, 1, 0))
# add the block the the full data
full_block[:, :, :, t_index] = data_3d
return full_block
def _get_t_position_index(multiframe_dicom):
# First try temporal position index itself
if 'DimensionIndexSequence' not in multiframe_dicom:
return None
for current_index in range(len(multiframe_dicom.DimensionIndexSequence)):
item = multiframe_dicom.DimensionIndexSequence[current_index]
if 'DimensionDescriptionLabel' in item and \
'Temporal Position Index' in item.DimensionDescriptionLabel:
return current_index
# This seems to work for most dti
for current_index in range(len(multiframe_dicom.DimensionIndexSequence)):
item = multiframe_dicom.DimensionIndexSequence[current_index]
if 'DimensionDescriptionLabel' in item and \
'Diffusion Gradient Orientation' in item.DimensionDescriptionLabel:
return current_index
# This seems to work for 3D grace sequences
for current_index in range(len(multiframe_dicom.DimensionIndexSequence)):
item = multiframe_dicom.DimensionIndexSequence[current_index]
if 'DimensionDescriptionLabel' in item and \
'Effective Echo Time' in item.DimensionDescriptionLabel:
return current_index
# First try trigger delay time (inspired by http://www.dclunie.com/papers/SCAR_20040522_CTMRMF.pdf)
for current_index in range(len(multiframe_dicom.DimensionIndexSequence)):
item = multiframe_dicom.DimensionIndexSequence[current_index]
if 'DimensionDescriptionLabel' in item and \
'Trigger Delay Time' in item.DimensionDescriptionLabel:
return current_index
return None
def _create_bvals_bvecs(multiframe_dicom, bval_file, bvec_file, nifti, nifti_file):
"""
Write the bvals from the sorted dicom files to a bval file
Inspired by https://github.com/IBIC/ibicUtils/blob/master/ibicBvalsBvecs.py
"""
# create the empty arrays
number_of_stack_slices = common.get_ss_value(multiframe_dicom[Tag(0x2001, 0x105f)][0][Tag(0x2001, 0x102d)])
number_of_stacks = int(int(multiframe_dicom.NumberOfFrames) / number_of_stack_slices)
bvals = numpy.zeros([number_of_stacks], dtype=numpy.int32)
bvecs = numpy.zeros([number_of_stacks, 3])
# loop over all timepoints and create a list with all bvals and bvecs
for stack_index in range(0, number_of_stacks):
stack = multiframe_dicom[Tag(0x5200, 0x9230)][stack_index]
if str(stack[Tag(0x0018, 0x9117)][0][Tag(0x0018, 0x9075)].value) == 'DIRECTIONAL':
bvals[stack_index] = common.get_fd_value(stack[Tag(0x0018, 0x9117)][0][Tag(0x0018, 0x9087)])
bvecs[stack_index, :] = common.get_fd_array_value(stack[Tag(0x0018, 0x9117)][0]
[Tag(0x0018, 0x9076)][0][Tag(0x0018, 0x9089)], 3)
# truncate nifti if needed
nifti, bvals, bvecs = _fix_diffusion_images(bvals, bvecs, nifti, nifti_file)
# save the found bvecs to the file
if numpy.count_nonzero(bvals) > 0 or numpy.count_nonzero(bvecs) > 0:
common.write_bval_file(bvals, bval_file)
common.write_bvec_file(bvecs, bvec_file)
else:
bval_file = None
bvec_file = None
bvals = None
bvecs = None
return bvals, bvecs, bval_file, bvec_file
def _fix_diffusion_images(bvals, bvecs, nifti, nifti_file):
"""
This function will remove the last timepoint from the nifti, bvals and bvecs if the last vector is 0,0,0
This is sometimes added at the end by philips
"""
# if all zero continue of if the last bvec is not all zero continue
if numpy.count_nonzero(bvecs) == 0 or not numpy.count_nonzero(bvals[-1]) == 0:
# nothing needs to be done here
return nifti, bvals, bvecs
# remove last elements from bvals and bvecs
bvals = bvals[:-1]
bvecs = bvecs[:-1]
# remove last elements from the nifti
new_nifti = nibabel.Nifti1Image(nifti.get_data()[:, :, :, :-1], nifti.affine)
new_nifti.to_filename(nifti_file)
return new_nifti, bvals, bvecs
def _create_singleframe_bvals_bvecs(grouped_dicoms, bval_file, bvec_file, nifti, nifti_file):
"""
Write the bvals from the sorted dicom files to a bval file
"""
# create the empty arrays
bvals = numpy.zeros([len(grouped_dicoms)], dtype=numpy.int32)
bvecs = numpy.zeros([len(grouped_dicoms), 3])
# loop over all timepoints and create a list with all bvals and bvecs
if _is_bval_type_a(grouped_dicoms):
bval_tag = Tag(0x2001, 0x1003)
bvec_x_tag = Tag(0x2005, 0x10b0)
bvec_y_tag = Tag(0x2005, 0x10b1)
bvec_z_tag = Tag(0x2005, 0x10b2)
for stack_index in range(0, len(grouped_dicoms)):
bvals[stack_index] = common.get_fl_value(grouped_dicoms[stack_index][0][bval_tag])
bvecs[stack_index, :] = [common.get_fl_value(grouped_dicoms[stack_index][0][bvec_x_tag]),
common.get_fl_value(grouped_dicoms[stack_index][0][bvec_y_tag]),
common.get_fl_value(grouped_dicoms[stack_index][0][bvec_z_tag])]
elif _is_bval_type_b(grouped_dicoms):
bval_tag = Tag(0x0018, 0x9087)
bvec_tag = Tag(0x0018, 0x9089)
for stack_index in range(0, len(grouped_dicoms)):
bvals[stack_index] = common.get_fd_value(grouped_dicoms[stack_index][0][bval_tag])
bvecs[stack_index, :] = common.get_fd_array_value(grouped_dicoms[stack_index][0][bvec_tag], 3)
# truncate nifti if needed
nifti, bvals, bvecs = _fix_diffusion_images(bvals, bvecs, nifti, nifti_file)
# save the found bvecs to the file
if numpy.count_nonzero(bvals) > 0 or numpy.count_nonzero(bvecs) > 0:
common.write_bval_file(bvals, bval_file)
common.write_bvec_file(bvecs, bvec_file)
else:
bval_file = None
bvec_file = None
bvals = None
bvecs = None
return nifti, bvals, bvecs, bval_file, bvec_file