Source code for toor.ImageReader.DICOM.writeDicom

import os
import pydicom
from pydicom.dataset import Dataset, FileDataset, FileMetaDataset
from pydicom.uid import ImplicitVRLittleEndian
from datetime import date, timedelta
import logging
from dateutil.parser import parse
import numpy as np

from . import (Patient, GeneralImage, GeneralReference, ImagePlane, ImagePixel, Device, PETImage, FrameOfReference,
               GeneralStudy, GeneralSeries, PETIsotope, PETSeries, PatientStudy, Equipment)


[docs] class WriteDicom: def __init__(self, parent=None, convert_from_raw=False, path_dcm=None, volume=None): if parent is None: return self.current_type_of_reconstruction = parent.current_type_of_reconstruction self.acquisitionInfo = Dict2Class(parent.acquisitionInfo) self.systemConfigurationsInfo = Dict2Class(parent.systemConfigurations_info) self.convert_from_raw = convert_from_raw if convert_from_raw: self.reconstructionInfo = Dict2Class(parent.raw_data_heder_info) else: self.reconstructionInfo = parent if path_dcm is None: self.path_dcm = parent.path_dcm else: self.path_dcm = path_dcm if volume is None: self.volume = np.zeros((10, 10, 10)) else: self.volume = volume self._seriesNumber = "1" self._load_series_number() self.seriesType = self.current_type_of_reconstruction self.seriesTypeValue2 = "IMAGE" self.seriesInstanceUID = pydicom.uid.generate_uid() self.sliceThickness = None self.fc = None self.updateVolume(self.volume) self._time_id_series = 0 # t = np.around(volume, decimals=2) self.filename_endian = os.path.join(self.path_dcm, str(self._seriesNumber), "IM{}_f{}_s{}ID_{}{}.dcm".format(0, 0, self._seriesNumber, self.acquisitionInfo.Id, self.acquisitionInfo.Tracer)) self.file_meta = FileMetaDataset() self.file_meta.TransferSyntaxUID = ImplicitVRLittleEndian self.file_meta.MediaStorageSOPClassUID = '1.2.840.10008.5.1.4.1.1.128' self.file_meta.MediaStorageSOPInstanceUID = '1.2.840.10008.1.3.10' self.file_meta.ImplementationClassUID = '1.3.6.1.4.1.9590.100.1.0.100.4.0' self.ds = FileDataset(self.filename_endian, {}, file_meta=self.file_meta, preamble=b"\0" * 128) self.patient = Patient(self.filename_endian, {}, self.ds) self.generalStudy = GeneralStudy(self.filename_endian, {}, self.ds) self.patientStudy = PatientStudy(self.filename_endian, {}, self.ds) self.generalSeries = GeneralSeries(self.filename_endian, {}, self.ds) self.petSeries = PETSeries(self.filename_endian, {}, self.ds) self.petIsotope = PETIsotope(self.filename_endian, {}, self.ds) self.frameReference = FrameOfReference(self.filename_endian, {}, self.ds) self.generalEquipment = Equipment(self.filename_endian, {}, self.ds) self.generalImage = GeneralImage(self.filename_endian, {}, self.ds) self.imagePlane = ImagePlane(self.filename_endian, {}, self.ds) self.imagePixel = ImagePixel(self.filename_endian, {}, self.ds) self.petImage = PETImage(self.filename_endian, {}, self.ds) self.device = Device(self.filename_endian, {}, self.ds)
[docs] def updateVolume(self, volume): self.volume = volume self.fc = np.max(self.volume) / 65535 # int16 self.volume /= self.fc self.volume = self.volume.astype(dtype=np.uint16) self.sliceThickness = (self.systemConfigurationsInfo.Array_crystal_x * self.systemConfigurationsInfo.Crystal_pitch_x + (self.systemConfigurationsInfo.Array_crystal_x - 1) * 2 * self.systemConfigurationsInfo.Reflector_interior_a_x) / self.volume.shape[2]
@property def time_id_series(self): return self._time_id_series
[docs] def updatetimeIdSeries(self, new_id: int): if self._time_id_series != new_id: self._time_id_series = new_id return self._time_id_series
@property def seriesNumber(self): return self._seriesNumber
[docs] def updateSeriesNumber(self, new_number: str): if self._seriesNumber != new_number: self._seriesNumber = new_number return self._seriesNumber
def _load_series_number(self): try: series_number = np.load(os.path.join(self.path_dcm, "series_number.npy")) self.updateSeriesNumber(str(series_number)) print(f"Series: {self._seriesNumber}") except FileNotFoundError: self.updateSeriesNumber(str(1)) np.save(os.path.join(self.path_dcm, "series_number.npy"), int(self.seriesNumber) + 1) def _generate_dicom_instance(self, slice_number=0, time_id=0): logging.debug('START') self.filename_endian = os.path.join(self.path_dcm, self._seriesNumber, "IM{}_f{}_s{}ID_{}{}.dcm".format(slice_number, time_id, self._seriesNumber, self.acquisitionInfo.Id, self.acquisitionInfo.Tracer)) # self.ds = FileDataset(self.filename_endian, {}, file_meta=self.file_meta, preamble=b"\0" * 128) self.ds.__init__(self.filename_endian, {}, file_meta=self.file_meta) self.ds.is_little_endian = True self.ds.is_implicit_VR = True self.ds.SOPInstanceUID = pydicom.uid.generate_uid() self.patient = Patient(self.filename_endian, {}, self.ds) self.generalStudy = GeneralStudy(self.filename_endian, {}, self.ds) self.patientStudy = PatientStudy(self.filename_endian, {}, self.ds) self.generalSeries = GeneralSeries(self.filename_endian, {}, self.ds) self.petSeries = PETSeries(self.filename_endian, {}, self.ds) self.petIsotope = PETIsotope(self.filename_endian, {}, self.ds) self.frameReference = FrameOfReference(self.filename_endian, {}, self.ds) self.generalEquipment = Equipment(self.filename_endian, {}, self.ds) self.generalImage = GeneralImage(self.filename_endian, {}, self.ds) self.imagePlane = ImagePlane(self.filename_endian, {}, self.ds) self.imagePixel = ImagePixel(self.filename_endian, {}, self.ds) self.petImage = PETImage(self.filename_endian, {}, self.ds) self.device = Device(self.filename_endian, {}, self.ds) self.default_values() self.patient.override(self.acquisitionInfo) self.generalSeries.override(self.acquisitionInfo, self, self.reconstructionInfo) self.petSeries.override(self.reconstructionInfo, self.acquisitionInfo, self) self.petIsotope.override(self.acquisitionInfo) self.generalImage.override(self.acquisitionInfo, self) self.imagePlane.override(self.reconstructionInfo, self) self.imagePixel.override() self.petImage.override(self) self.device.override() self.frameReference.override(self.acquisitionInfo) # self.patient_module() self.pet_corrections() # self.series_modules() # self.pet_isotope() # self.general_image() # self.frame_reference() logging.debug('END')
[docs] def default_values(self): # self.ds.SamplesPerPixel = 1 # self.ds.PhotometricInterpretation = 'MONOCHROME2' # self.ds.BitsAllocated = 16 # self.ds.BitsStored = 16 # self.ds.HighBit = 15 # self.ds.PixelRepresentation = 0 # self.ds.RescaleIntercept = 0 # self.ds.RescaleSlope = 1 # self.ds.Modality = 'PT' # self.ds.CollimatorType = 'NONE' # self.ds.Units = 'BQML' # poderá vir outras unidades # self.ds.CountsSource = 'EMISSION' self.ds.ReprojectionMethod = "" self.ds.SOPClassUID = '1.2.840.10008.5.1.4.1.1.128' self.ds.StudyID = "01245405" # STUDY = PET self.ds.ReferringPhysicianName = "" self.ds.AccessionNumber = "" self.ds.DecayCorrection = "START" self.ds.ActualFrameDuration = None
[docs] def pet_corrections(self): logging.debug('START') self.ds.CorrectedImage = ['DECY', 'NORM', 'DCAL', 'RADL'] # self.ds.AttenuationCorrectionMethod = self.imageInfo['attenuation_data'] # Extra info taht not present in standard DICOM IOD # self.ds.RandomsCorrectionMethod = 'NONE' if(self.imageInfo['random_correction'] != True) else 'INCOMPLETO' # self.ds.ScatterCorrected = 'YES' if (self.imageInfo['scatter_angle_correction']) else 'NO' # self.ds.AttenuationCorrected = 'YES' if (self.imageInfo['attenuation_correction']) else 'NO' # self.ds.DeadTimeCorrected = 'YES' if (self.imageInfo['dead_time_correction']) else 'NO' # self.ds.PatientMotionCorrected = 'YES' if(self.imageInfo['respiratory_movement_correction'] or # self.imageInfo['heart_movement_correction']) else 'NO' # self.ds.RandomsCorrected = 'YES' if(self.imageInfo['random_correction'] or # self.imageInfo['random_correction']) else 'NO' self.ds.ScatterFractionFactor = 0 self.ds.DeadTimeFactor = 1 self.ds.DecayFactor = 1 self.ds.DoseCalibrationFactor = 1 logging.debug('END')
[docs] def write_dicom_file(self): logging.debug('START') try: self.acquisitionInfo.TypeOfSubject except AttributeError: # QtWidgets.QMessageBox.warning(None, "WARNING!", "It is impossible to export data. " # "Acquisition wasn't done with software latest version.", # QtWidgets.QMessageBox.Ok) return if self.convert_from_raw: dicom_folder = os.path.join(self.path_dcm, 'dicom') if not os.path.exists(dicom_folder): os.mkdir(dicom_folder) file_path_dcm = os.path.join(dicom_folder, str(self._seriesNumber)) else: file_path_dcm = os.path.join(self.path_dcm, str(self._seriesNumber)) if not os.path.exists(file_path_dcm): os.mkdir(file_path_dcm) image_index = 0 for i in range(0, self.volume.shape[2]): if self.current_type_of_reconstruction == "WHOLE BODY" or self.current_type_of_reconstruction == "STATIC": image_index = i elif self.current_type_of_reconstruction == "DYNAMIC": image_index = self.volume.shape[2] * self.time_id_series + i time_init_frame = self.reconstructionInfo.reading_data[ self.acquisitionInfo.TurnEndIndex[self.time_id_series], 6] * 1000 time_end_frame = self.reconstructionInfo.reading_data[ self.acquisitionInfo.TurnEndIndex[self.time_id_series + 1], 6] * 1000 average_time_frame = ((time_end_frame - time_init_frame) / 2 + time_init_frame) self.ds.FrameReferenceTime = average_time_frame self.ds.ActualFrameDuration = int(time_end_frame - time_init_frame) self.ds.AcquisitionTime = (parse(self.acquisitionInfo.AcquisitionStartTime) + timedelta(seconds=time_init_frame / 1000)).strftime("%H%M%S") self.ds.ContentTime = (parse(self.acquisitionInfo.AcquisitionStartTime) + timedelta(seconds=time_init_frame / 1000)).strftime("%H%M%S") self._generate_dicom_instance(i, self.time_id_series) self.ds.InstanceNumber = "{}".format(image_index) self.ds.ImageIndex = image_index # self.ds.TimeSliceIndex = self.time_id_series self.ds.PixelSpacing = [self.reconstructionInfo.pixelSizeXY, self.reconstructionInfo.pixelSizeXY] self.ds.SliceThickness = self.sliceThickness self.imagePlane.update_image_position(0, self.sliceThickness * i) # self.imagePlane.update_image_position(0, 0) slice_ = self.volume[:, :, i] if slice_.dtype != np.uint16: slice_ = slice_.astype(np.uint16) self.ds.PixelData = slice_.tobytes() self.ds.save_as(self.filename_endian) logging.debug('END')
[docs] class Dict2Class(object): def __init__(self, my_dict): for key in my_dict: key_update = key.split(" ") for k in range(len(key_update)): key_update[k] = key_update[k].capitalize() print(k) key_update = "".join(key_update) key_update = key_update.replace("-", "") key_update = key_update.replace("%", "") key_update = key_update.replace("(", "") key_update = key_update.replace(")", "") key_update = key_update.replace(",", "") setattr(self, key_update.replace(" ", ""), my_dict[key])