Skip to content

References

convert(input_filename, output_folder, alt_output_folder=None, output_format='omezarr2', show_progress=False, verbose=False)

Convert an input file to OME format and write to output folder(s).

Parameters:

Name Type Description Default
input_filename str

Path to the input file.

required
output_folder str

Output folder path.

required
alt_output_folder str

Alternative output folder path.

None
output_format str

Output format string.

'omezarr2'
show_progress bool

If True, print progress.

False
verbose bool

If True, enable verbose logging.

False

Returns:

Name Type Description
str

JSON string with conversion result info array.

Source code in converter.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def convert(input_filename, output_folder, alt_output_folder=None,
            output_format='omezarr2', show_progress=False, verbose=False):
    """
    Convert an input file to OME format and write to output folder(s).

    Args:
        input_filename (str): Path to the input file.
        output_folder (str): Output folder path.
        alt_output_folder (str, optional): Alternative output folder path.
        output_format (str): Output format string.
        show_progress (bool): If True, print progress.
        verbose (bool): If True, enable verbose logging.

    Returns:
        str: JSON string with conversion result info array.
    """

    logging.info(f'Importing {input_filename}')
    source = create_source(input_filename)
    writer, output_ext = create_writer(output_format, verbose=verbose)
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    source.init_metadata()
    name = source.get_name()
    output_path = os.path.join(output_folder, name + output_ext)
    full_output_path = writer.write(output_path, source)
    source.close()

    if show_progress:
        print(f'Converting {input_filename} to {output_path}')

    result = {'name': name}
    if isinstance(full_output_path, list):
        full_path = full_output_path[0]
    else:
        full_path = full_output_path
    result['full_path'] = full_path
    message = f"Exported   {result['full_path']}"

    if alt_output_folder:
        if not os.path.exists(alt_output_folder):
            os.makedirs(alt_output_folder)
        alt_output_path = os.path.join(alt_output_folder, name + output_ext)
        if isinstance(full_output_path, list):
            for path in full_output_path:
                alt_output_path = os.path.join(alt_output_folder, os.path.basename(path))
                shutil.copy2(path, alt_output_path)
        elif os.path.isdir(full_output_path):
            shutil.copytree(full_output_path, alt_output_path, dirs_exist_ok=True)
        else:
            shutil.copy2(full_output_path, alt_output_path)
        result['alt_path'] = os.path.join(alt_output_folder, os.path.basename(full_path))
        message += f' and {result["alt_path"]}'

    logging.info(message)
    if show_progress:
        print(message)

    return json.dumps([result])

init_logging(log_filename, verbose=False)

Initialize logging to file and optionally to console.

Parameters:

Name Type Description Default
log_filename str

Path to the log file.

required
verbose bool

If True, also log to console.

False
Source code in converter.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def init_logging(log_filename, verbose=False):
    """
    Initialize logging to file and optionally to console.

    Args:
        log_filename (str): Path to the log file.
        verbose (bool): If True, also log to console.
    """
    basepath = os.path.dirname(log_filename)
    if basepath and not os.path.exists(basepath):
        os.makedirs(basepath)
    handlers = [logging.FileHandler(log_filename, encoding='utf-8')]
    if verbose:
        handlers += [logging.StreamHandler()]
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s',
                        handlers=handlers,
                        encoding='utf-8')

    logging.getLogger('ome_zarr').setLevel(logging.WARNING)     # mute verbose ome_zarr logging

DbReader

DBReader

Reads and queries a SQLite database, returning results as dictionaries.

Source code in src\DbReader.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class DBReader:
    """
    Reads and queries a SQLite database, returning results as dictionaries.
    """

    def __init__(self, db_file):
        """
        Initialize DBReader with a database file.

        Args:
            db_file (str): Path to the SQLite database file.
        """
        self.conn = sqlite3.connect(db_file)
        self.conn.row_factory = DBReader.dict_factory

    @staticmethod
    def dict_factory(cursor, row):
        """
        Converts a database row to a dictionary.

        Args:
            cursor: SQLite cursor object.
            row: Row data.

        Returns:
            dict: Mapping column names to values.
        """
        dct = {}
        for index, column in enumerate(cursor.description):
            dct[column[0]] = row[index]
        return dct

    def fetch_all(self, query, params=(), return_dicts=True):
        """
        Executes a query and fetches all results.

        Args:
            query (str): SQL query string.
            params (tuple): Query parameters.
            return_dicts (bool): If True, returns list of dicts; else, returns first column values.

        Returns:
            list: Query results.
        """
        cursor = self.conn.cursor()
        cursor.execute(query, params)
        dct = cursor.fetchall()
        if return_dicts:
            values = dct
        else:
            values = [list(row.values())[0] for row in dct]
        return values

    def close(self):
        """
        Closes the database connection.
        """
        self.conn.close()

conn = sqlite3.connect(db_file) instance-attribute

__init__(db_file)

Initialize DBReader with a database file.

Parameters:

Name Type Description Default
db_file str

Path to the SQLite database file.

required
Source code in src\DbReader.py
 9
10
11
12
13
14
15
16
17
def __init__(self, db_file):
    """
    Initialize DBReader with a database file.

    Args:
        db_file (str): Path to the SQLite database file.
    """
    self.conn = sqlite3.connect(db_file)
    self.conn.row_factory = DBReader.dict_factory

close()

Closes the database connection.

Source code in src\DbReader.py
57
58
59
60
61
def close(self):
    """
    Closes the database connection.
    """
    self.conn.close()

dict_factory(cursor, row) staticmethod

Converts a database row to a dictionary.

Parameters:

Name Type Description Default
cursor

SQLite cursor object.

required
row

Row data.

required

Returns:

Name Type Description
dict

Mapping column names to values.

Source code in src\DbReader.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@staticmethod
def dict_factory(cursor, row):
    """
    Converts a database row to a dictionary.

    Args:
        cursor: SQLite cursor object.
        row: Row data.

    Returns:
        dict: Mapping column names to values.
    """
    dct = {}
    for index, column in enumerate(cursor.description):
        dct[column[0]] = row[index]
    return dct

fetch_all(query, params=(), return_dicts=True)

Executes a query and fetches all results.

Parameters:

Name Type Description Default
query str

SQL query string.

required
params tuple

Query parameters.

()
return_dicts bool

If True, returns list of dicts; else, returns first column values.

True

Returns:

Name Type Description
list

Query results.

Source code in src\DbReader.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def fetch_all(self, query, params=(), return_dicts=True):
    """
    Executes a query and fetches all results.

    Args:
        query (str): SQL query string.
        params (tuple): Query parameters.
        return_dicts (bool): If True, returns list of dicts; else, returns first column values.

    Returns:
        list: Query results.
    """
    cursor = self.conn.cursor()
    cursor.execute(query, params)
    dct = cursor.fetchall()
    if return_dicts:
        values = dct
    else:
        values = [list(row.values())[0] for row in dct]
    return values

ISyntaxSource

ISyntaxSource

Bases: ImageSource

Loads image and metadata from ISyntax format files.

Source code in src\ISyntaxSource.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class ISyntaxSource(ImageSource):
    """
    Loads image and metadata from ISyntax format files.
    """
    def init_metadata(self):
        """
        Initializes and loads metadata from the ISyntax file.

        Returns:
            dict: Metadata dictionary.
        """
        # read XML metadata header
        data = b''
        block_size = 1024 * 1024
        end_char = b'\x04'   # EOT character
        with open(self.uri, mode='rb') as file:
            done = False
            while not done:
                data_block = file.read(block_size)
                if end_char in data_block:
                    index = data_block.index(end_char)
                    data_block = data_block[:index]
                    done = True
                data += data_block

        self.metadata = xml_content_to_dict(ElementTree.XML(data.decode()))
        if 'DPUfsImport' in self.metadata:
            self.metadata = self.metadata['DPUfsImport']

        image = None
        image_type = ''
        for image0 in self.metadata.get('PIM_DP_SCANNED_IMAGES', []):
            image = image0.get('DPScannedImage', {})
            image_type = image.get('PIM_DP_IMAGE_TYPE').lower()
            if image_type in ['wsi']:
                break

        if image is not None:
            self.image_type = image_type
            nbits = image.get('UFS_IMAGE_BLOCK_HEADER_TEMPLATES', [{}])[0].get('UFSImageBlockHeaderTemplate', {}).get('DICOM_BITS_STORED', 16)
            nbits = int(np.ceil(nbits / 8)) * 8
        else:
            self.image_type = ''
            nbits = 16

        self.is_plate = 'screen' in self.image_type or 'plate' in self.image_type or 'wells' in self.image_type

        # original color channels get converted in pyisyntax package to 8-bit RGBA
        nbits = 8
        self.dim_order = 'yxc'
        self.channels = []
        self.nchannels = 4

        self.isyntax = ISyntax.open(self.uri)
        self.width, self.height = self.isyntax.dimensions
        self.shape = 1, self.nchannels, 1, self.height, self.width
        self.dtype = np.dtype(f'uint{nbits}')

        return self.metadata

    def is_screen(self):
        """
        Checks if the source is a plate/screen.

        Returns:
            bool: True if plate/screen.
        """
        return self.is_plate

    def get_shape(self):
        """
        Returns the shape of the image data.

        Returns:
            tuple: Shape of the image data.
        """
        return self.shape

    def get_data(self, well_id=None, field_id=None):
        """
        Gets image data for a specific well and field.

        Args:
            well_id (str, optional): Well identifier.
            field_id (int, optional): Field index.

        Returns:
            ndarray: Image data.
        """
        return self.isyntax.read_region(0, 0, self.width, self.height)

    def get_name(self):
        """
        Gets the file title.

        Returns:
            str: Name.
        """
        return get_filetitle(self.uri)

    def get_dim_order(self):
        """
        Returns the dimension order string.

        Returns:
            str: Dimension order.
        """
        return self.dim_order

    def get_pixel_size_um(self):
        """
        Returns the pixel size in micrometers.

        Returns:
            dict: Pixel size for x and y.
        """
        return {'x': self.isyntax.mpp_x, 'y': self.isyntax.mpp_y}

    def get_dtype(self):
        """
        Returns the numpy dtype of the image data.

        Returns:
            dtype: Numpy dtype.
        """
        return self.dtype

    def get_position_um(self, well_id=None):
        """
        Returns the position in micrometers (empty for ISyntax).

        Returns:
            dict: Empty dict.
        """
        return {}

    def get_channels(self):
        """
        Returns channel metadata.

        Returns:
            list: List of channel dicts.
        """
        return self.channels

    def get_nchannels(self):
        """
        Returns the number of channels.

        Returns:
            int: Number of channels.
        """
        return self.nchannels

    def get_rows(self):
        """
        Returns the list of row identifiers (empty for ISyntax).

        Returns:
            list: Empty list.
        """
        return []

    def get_columns(self):
        """
        Returns the list of column identifiers (empty for ISyntax).

        Returns:
            list: Empty list.
        """
        return []

    def get_wells(self):
        """
        Returns the list of well identifiers (empty for ISyntax).

        Returns:
            list: Empty list.
        """
        return []

    def get_time_points(self):
        """
        Returns the list of time points (empty for ISyntax).

        Returns:
            list: Empty list.
        """
        return []

    def get_fields(self):
        """
        Returns the list of field indices (empty for ISyntax).

        Returns:
            list: Empty list.
        """
        return []

    def get_acquisitions(self):
        """
        Returns acquisition metadata (empty for ISyntax).

        Returns:
            list: Empty list.
        """
        return []

    def get_total_data_size(self):
        """
        Returns the estimated total data size.

        Returns:
            int: Total data size in bytes.
        """
        total_size = np.prod(self.shape)
        if self.is_plate:
            total_size *= len(self.get_wells()) * len(self.get_fields())
        return total_size

    def close(self):
        """
        Closes the ISyntax file.
        """
        self.isyntax.close()

close()

Closes the ISyntax file.

Source code in src\ISyntaxSource.py
233
234
235
236
237
def close(self):
    """
    Closes the ISyntax file.
    """
    self.isyntax.close()

get_acquisitions()

Returns acquisition metadata (empty for ISyntax).

Returns:

Name Type Description
list

Empty list.

Source code in src\ISyntaxSource.py
212
213
214
215
216
217
218
219
def get_acquisitions(self):
    """
    Returns acquisition metadata (empty for ISyntax).

    Returns:
        list: Empty list.
    """
    return []

get_channels()

Returns channel metadata.

Returns:

Name Type Description
list

List of channel dicts.

Source code in src\ISyntaxSource.py
149
150
151
152
153
154
155
156
def get_channels(self):
    """
    Returns channel metadata.

    Returns:
        list: List of channel dicts.
    """
    return self.channels

get_columns()

Returns the list of column identifiers (empty for ISyntax).

Returns:

Name Type Description
list

Empty list.

Source code in src\ISyntaxSource.py
176
177
178
179
180
181
182
183
def get_columns(self):
    """
    Returns the list of column identifiers (empty for ISyntax).

    Returns:
        list: Empty list.
    """
    return []

get_data(well_id=None, field_id=None)

Gets image data for a specific well and field.

Parameters:

Name Type Description Default
well_id str

Well identifier.

None
field_id int

Field index.

None

Returns:

Name Type Description
ndarray

Image data.

Source code in src\ISyntaxSource.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_data(self, well_id=None, field_id=None):
    """
    Gets image data for a specific well and field.

    Args:
        well_id (str, optional): Well identifier.
        field_id (int, optional): Field index.

    Returns:
        ndarray: Image data.
    """
    return self.isyntax.read_region(0, 0, self.width, self.height)

get_dim_order()

Returns the dimension order string.

Returns:

Name Type Description
str

Dimension order.

Source code in src\ISyntaxSource.py
113
114
115
116
117
118
119
120
def get_dim_order(self):
    """
    Returns the dimension order string.

    Returns:
        str: Dimension order.
    """
    return self.dim_order

get_dtype()

Returns the numpy dtype of the image data.

Returns:

Name Type Description
dtype

Numpy dtype.

Source code in src\ISyntaxSource.py
131
132
133
134
135
136
137
138
def get_dtype(self):
    """
    Returns the numpy dtype of the image data.

    Returns:
        dtype: Numpy dtype.
    """
    return self.dtype

get_fields()

Returns the list of field indices (empty for ISyntax).

Returns:

Name Type Description
list

Empty list.

Source code in src\ISyntaxSource.py
203
204
205
206
207
208
209
210
def get_fields(self):
    """
    Returns the list of field indices (empty for ISyntax).

    Returns:
        list: Empty list.
    """
    return []

get_name()

Gets the file title.

Returns:

Name Type Description
str

Name.

Source code in src\ISyntaxSource.py
104
105
106
107
108
109
110
111
def get_name(self):
    """
    Gets the file title.

    Returns:
        str: Name.
    """
    return get_filetitle(self.uri)

get_nchannels()

Returns the number of channels.

Returns:

Name Type Description
int

Number of channels.

Source code in src\ISyntaxSource.py
158
159
160
161
162
163
164
165
def get_nchannels(self):
    """
    Returns the number of channels.

    Returns:
        int: Number of channels.
    """
    return self.nchannels

get_pixel_size_um()

Returns the pixel size in micrometers.

Returns:

Name Type Description
dict

Pixel size for x and y.

Source code in src\ISyntaxSource.py
122
123
124
125
126
127
128
129
def get_pixel_size_um(self):
    """
    Returns the pixel size in micrometers.

    Returns:
        dict: Pixel size for x and y.
    """
    return {'x': self.isyntax.mpp_x, 'y': self.isyntax.mpp_y}

get_position_um(well_id=None)

Returns the position in micrometers (empty for ISyntax).

Returns:

Name Type Description
dict

Empty dict.

Source code in src\ISyntaxSource.py
140
141
142
143
144
145
146
147
def get_position_um(self, well_id=None):
    """
    Returns the position in micrometers (empty for ISyntax).

    Returns:
        dict: Empty dict.
    """
    return {}

get_rows()

Returns the list of row identifiers (empty for ISyntax).

Returns:

Name Type Description
list

Empty list.

Source code in src\ISyntaxSource.py
167
168
169
170
171
172
173
174
def get_rows(self):
    """
    Returns the list of row identifiers (empty for ISyntax).

    Returns:
        list: Empty list.
    """
    return []

get_shape()

Returns the shape of the image data.

Returns:

Name Type Description
tuple

Shape of the image data.

Source code in src\ISyntaxSource.py
82
83
84
85
86
87
88
89
def get_shape(self):
    """
    Returns the shape of the image data.

    Returns:
        tuple: Shape of the image data.
    """
    return self.shape

get_time_points()

Returns the list of time points (empty for ISyntax).

Returns:

Name Type Description
list

Empty list.

Source code in src\ISyntaxSource.py
194
195
196
197
198
199
200
201
def get_time_points(self):
    """
    Returns the list of time points (empty for ISyntax).

    Returns:
        list: Empty list.
    """
    return []

get_total_data_size()

Returns the estimated total data size.

Returns:

Name Type Description
int

Total data size in bytes.

Source code in src\ISyntaxSource.py
221
222
223
224
225
226
227
228
229
230
231
def get_total_data_size(self):
    """
    Returns the estimated total data size.

    Returns:
        int: Total data size in bytes.
    """
    total_size = np.prod(self.shape)
    if self.is_plate:
        total_size *= len(self.get_wells()) * len(self.get_fields())
    return total_size

get_wells()

Returns the list of well identifiers (empty for ISyntax).

Returns:

Name Type Description
list

Empty list.

Source code in src\ISyntaxSource.py
185
186
187
188
189
190
191
192
def get_wells(self):
    """
    Returns the list of well identifiers (empty for ISyntax).

    Returns:
        list: Empty list.
    """
    return []

init_metadata()

Initializes and loads metadata from the ISyntax file.

Returns:

Name Type Description
dict

Metadata dictionary.

Source code in src\ISyntaxSource.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def init_metadata(self):
    """
    Initializes and loads metadata from the ISyntax file.

    Returns:
        dict: Metadata dictionary.
    """
    # read XML metadata header
    data = b''
    block_size = 1024 * 1024
    end_char = b'\x04'   # EOT character
    with open(self.uri, mode='rb') as file:
        done = False
        while not done:
            data_block = file.read(block_size)
            if end_char in data_block:
                index = data_block.index(end_char)
                data_block = data_block[:index]
                done = True
            data += data_block

    self.metadata = xml_content_to_dict(ElementTree.XML(data.decode()))
    if 'DPUfsImport' in self.metadata:
        self.metadata = self.metadata['DPUfsImport']

    image = None
    image_type = ''
    for image0 in self.metadata.get('PIM_DP_SCANNED_IMAGES', []):
        image = image0.get('DPScannedImage', {})
        image_type = image.get('PIM_DP_IMAGE_TYPE').lower()
        if image_type in ['wsi']:
            break

    if image is not None:
        self.image_type = image_type
        nbits = image.get('UFS_IMAGE_BLOCK_HEADER_TEMPLATES', [{}])[0].get('UFSImageBlockHeaderTemplate', {}).get('DICOM_BITS_STORED', 16)
        nbits = int(np.ceil(nbits / 8)) * 8
    else:
        self.image_type = ''
        nbits = 16

    self.is_plate = 'screen' in self.image_type or 'plate' in self.image_type or 'wells' in self.image_type

    # original color channels get converted in pyisyntax package to 8-bit RGBA
    nbits = 8
    self.dim_order = 'yxc'
    self.channels = []
    self.nchannels = 4

    self.isyntax = ISyntax.open(self.uri)
    self.width, self.height = self.isyntax.dimensions
    self.shape = 1, self.nchannels, 1, self.height, self.width
    self.dtype = np.dtype(f'uint{nbits}')

    return self.metadata

is_screen()

Checks if the source is a plate/screen.

Returns:

Name Type Description
bool

True if plate/screen.

Source code in src\ISyntaxSource.py
73
74
75
76
77
78
79
80
def is_screen(self):
    """
    Checks if the source is a plate/screen.

    Returns:
        bool: True if plate/screen.
    """
    return self.is_plate

ImageDbSource

ImageDbSource

Bases: ImageSource

Loads image and metadata from a database source for high-content screening.

Source code in src\ImageDbSource.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
class ImageDbSource(ImageSource):
    """
    Loads image and metadata from a database source for high-content screening.
    """
    def __init__(self, uri, metadata={}):
        """
        Initialize ImageDbSource.

        Args:
            uri (str): Path to the database file.
            metadata (dict): Optional metadata dictionary.
        """
        super().__init__(uri, metadata)
        self.db = DBReader(self.uri)
        self.data = None
        self.data_well_id = None
        self.metadata['dim_order'] = 'tczyx'

    def init_metadata(self):
        """
        Initializes and loads metadata from the database.

        Returns:
            dict: Metadata dictionary.
        """
        self._get_time_series_info()
        self._get_experiment_metadata()
        self._get_well_info()
        self._get_image_info()
        self._get_sizes()
        return self.metadata

    def get_shape(self):
        """
        Returns the shape of the image data.

        Returns:
            tuple: Shape of the image data.
        """
        return self.shape

    def _get_time_series_info(self):
        """
        Loads time series and image file info into metadata.
        """
        time_series_ids = sorted(self.db.fetch_all('SELECT DISTINCT TimeSeriesElementId FROM SourceImageBase', return_dicts=False))
        self.metadata['time_points'] = time_series_ids

        level_ids = sorted(self.db.fetch_all('SELECT DISTINCT level FROM SourceImageBase', return_dicts=False))
        self.metadata['levels'] = level_ids

        image_files = {time_series_id: os.path.join(os.path.dirname(self.uri), f'images-{time_series_id}.db')
                       for time_series_id in time_series_ids}
        self.metadata['image_files'] = image_files

    def _get_experiment_metadata(self):
        """
        Loads experiment metadata and acquisition info into metadata.
        """
        creation_info = self.db.fetch_all('SELECT DateCreated, Creator, Name FROM ExperimentBase')[0]
        creation_info['DateCreated'] = convert_dotnet_ticks_to_datetime(creation_info['DateCreated'])
        self.metadata.update(creation_info)

        acquisitions = self.db.fetch_all('SELECT Name, Description, DateCreated, DateModified FROM AcquisitionExp')
        for acquisition in acquisitions:
            acquisition['DateCreated'] = convert_dotnet_ticks_to_datetime(acquisition['DateCreated'])
            acquisition['DateModified'] = convert_dotnet_ticks_to_datetime(acquisition['DateModified'])
        self.metadata['acquisitions'] = acquisitions

    def _get_well_info(self):
        """
        Loads well and channel information into metadata.
        """
        well_info = self.db.fetch_all('''
            SELECT SensorSizeYPixels, SensorSizeXPixels, Objective, PixelSizeUm, SensorBitness, SitesX, SitesY
            FROM AcquisitionExp, AutomaticZonesParametersExp
        ''')[0]

        # Filter multiple duplicate channel entries
        channel_infos = self.db.fetch_all('''
            SELECT DISTINCT ChannelNumber, Emission, Excitation, Dye, Color
            FROM ImagechannelExp
            ORDER BY ChannelNumber
        ''')
        self.metadata['channels'] = channel_infos
        self.metadata['num_channels'] = len(channel_infos)

        wells = self.db.fetch_all('SELECT DISTINCT Name FROM Well')
        zone_names = [well['Name'] for well in wells]
        rows = set()
        cols = set()
        for zone_name in zone_names:
            row, col = split_well_name(zone_name)
            rows.add(row)
            cols.add(col)
        well_info['rows'] = sorted(list(rows))
        well_info['columns'] = sorted(list(cols), key=lambda x: int(x))
        num_sites = well_info['SitesX'] * well_info['SitesY']
        well_info['num_sites'] = num_sites
        well_info['fields'] = list(range(num_sites))

        image_wells = self.db.fetch_all('SELECT Name, ZoneIndex, CoordX, CoordY FROM Well WHERE HasImages = 1')
        self.metadata['wells'] = dict(sorted({well['Name']: well for well in image_wells}.items(),
                                             key=lambda x: split_well_name(x[0], col_as_int=True)))

        xmax, ymax = 0, 0
        for well_id in self.metadata['wells']:
            well_data = self._read_well_info(well_id)
            xmax = max(xmax, np.max([info['CoordX'] + info['SizeX'] for info in well_data]))
            ymax = max(ymax, np.max([info['CoordY'] + info['SizeY'] for info in well_data]))
        pixel_size = well_info.get('PixelSizeUm', 1)
        well_info['max_sizex_um'] = xmax * pixel_size
        well_info['max_sizey_um'] = ymax * pixel_size

        self.metadata['well_info'] = well_info

    def _get_image_info(self):
        """
        Loads image bit depth and dtype info into metadata.
        """
        bits_per_pixel = self.db.fetch_all('SELECT DISTINCT BitsPerPixel FROM SourceImageBase', return_dicts=False)[0]
        self.metadata['bits_per_pixel'] = bits_per_pixel
        bits_per_pixel = int(np.ceil(bits_per_pixel / 8)) * 8
        if bits_per_pixel == 24:
            bits_per_pixel = 32
        self.metadata['dtype'] = np.dtype(f'uint{bits_per_pixel}')

    def _get_sizes(self):
        """
        Calculates and stores image shape and estimated data size.
        """
        well_info = self.metadata['well_info']
        nbytes = self.metadata['dtype'].itemsize
        self.shape = len(self.metadata['time_points']), self.metadata['num_channels'], 1, well_info['SensorSizeYPixels'], well_info['SensorSizeXPixels']
        max_data_size = np.prod(self.shape) * nbytes * len(self.metadata['wells']) * well_info['num_sites']
        self.metadata['max_data_size'] = max_data_size

    def _read_well_info(self, well_id, channel=None, time_point=None, level=0):
        """
        Reads image info for a specific well, optionally filtered by channel and time point.

        Args:
            well_id (str): Well identifier.
            channel (int, optional): Channel ID.
            time_point (int, optional): Time point ID.
            level (int, optional): Image level index.

        Returns:
            list: Well image info dictionaries.
        """
        well_id = strip_leading_zeros(well_id)
        well_ids = self.metadata.get('wells', {})

        if well_id not in well_ids:
            raise ValueError(f'Invalid Well: {well_id}. Available values: {well_ids}')

        zone_index = well_ids[well_id]['ZoneIndex']
        well_info = self.db.fetch_all('''
            SELECT *
            FROM SourceImageBase
            WHERE ZoneIndex = ? AND level = ?
            ORDER BY CoordX ASC, CoordY ASC
        ''', (zone_index, level))

        if channel is not None:
             well_info = [info for info in well_info if info['ChannelId'] == channel]
        if time_point is not None:
             well_info = [info for info in well_info if info['TimeSeriesElementId'] == time_point]
        if not well_info:
            raise ValueError(f'No data found for well {well_id}')
        return well_info

    def _assemble_image_data(self, well_info):
        """
        Assembles image data array using well info.

        Args:
            well_info (list): List of well image info dicts.
        """
        dtype = self.metadata['dtype']
        well_info = np.asarray(well_info)
        xmax = np.max([info['CoordX'] + info['SizeX'] for info in well_info])
        ymax = np.max([info['CoordY'] + info['SizeY'] for info in well_info])
        zmax = np.max([info.get('CoordZ', 0) + info.get('SizeZ', 1) for info in well_info])
        nc = len(set([info['ChannelId'] for info in well_info]))
        nt = len(self.metadata['time_points'])
        data = np.zeros((nt, nc, zmax, ymax, xmax), dtype=dtype)

        for timei, time_id in enumerate(self.metadata['time_points']):
            image_file = self.metadata['image_files'][time_id]
            with open(image_file, 'rb') as fid:
                for info in well_info:
                    if info['TimeSeriesElementId'] == time_id:
                        fid.seek(info['ImageIndex'])
                        coordx, coordy, coordz = info['CoordX'], info['CoordY'], info.get('CoordZ', 0)
                        sizex, sizey, sizez = info['SizeX'], info['SizeY'], info.get('SizeZ', 1)
                        channeli = info['ChannelId']
                        tile = np.fromfile(fid, dtype=dtype, count=sizez * sizey * sizex)
                        data[timei, channeli, coordz:coordz + sizez, coordy:coordy + sizey, coordx:coordx + sizex] = tile.reshape((sizez, sizey, sizex))

        self.data = data

    def _extract_site(self, site_id=None):
        """
        Extracts image data for a specific site or all sites.

        Args:
            site_id (int, optional): Site index. If None, returns all data.

        Returns:
            ndarray or list: Image data for the site(s).
        """
        well_info = self.metadata['well_info']
        sitesx = well_info['SitesX']
        sitesy = well_info['SitesY']
        sitesz = well_info.get('SitesZ', 1)
        num_sites = well_info['num_sites']
        sizex = well_info['SensorSizeXPixels']
        sizey = well_info['SensorSizeYPixels']
        sizez = well_info.get('SensorSizeZPixels', 1)

        if site_id is None:
            # Return full image data
            return self.data

        site_id = int(site_id)
        if site_id < 0:
            # Return list of all fields
            data = []
            for zi in range(sitesz):
                for yi in range(sitesy):
                    for xi in range(sitesx):
                        startx = xi * sizex
                        starty = yi * sizey
                        startz = zi * sizez
                        data.append(self.data[..., startz:startz + sizez, starty:starty + sizey, startx:startx + sizex])
            return data
        elif 0 <= site_id < num_sites:
            # Return specific site
            xi = site_id % sitesx
            yi = (site_id // sitesx) % sitesy
            zi = site_id // sitesx // sitesy
            startx = xi * sizex
            starty = yi * sizey
            startz = zi * sizez
            return self.data[..., startz:startz + sizez, starty:starty + sizey, startx:startx + sizex]
        else:
            raise ValueError(f'Invalid site: {site_id}')

    def is_screen(self):
        """
        Checks if the source is a screen (has wells).

        Returns:
            bool: True if wells exist.
        """
        return len(self.metadata['wells']) > 0

    def get_data(self, well_id=None, field_id=None):
        """
        Gets image data for a specific well and field.

        Args:
            well_id (str, optional): Well identifier.
            field_id (int, optional): Field index.

        Returns:
            ndarray: Image data.
        """
        if well_id != self.data_well_id:
            self._assemble_image_data(self._read_well_info(well_id))
            self.data_well_id = well_id
        return self._extract_site(field_id)

    def get_name(self):
        """
        Gets the experiment or file name.

        Returns:
            str: Name.
        """
        name = self.metadata.get('Name')
        if not name:
            name = splitall(os.path.splitext(self.uri)[0])[-2]
        return name

    def get_rows(self):
        """
        Returns the list of row identifiers.

        Returns:
            list: Row identifiers.
        """
        return self.metadata['well_info']['rows']

    def get_columns(self):
        """
        Returns the list of column identifiers.

        Returns:
            list: Column identifiers.
        """
        return self.metadata['well_info']['columns']

    def get_wells(self):
        """
        Returns the list of well identifiers.

        Returns:
            list: Well identifiers.
        """
        return list(self.metadata['wells'])

    def get_time_points(self):
        """
        Returns the list of time points.

        Returns:
            list: Time point IDs.
        """
        return self.metadata['time_points']

    def get_fields(self):
        """
        Returns the list of field indices.

        Returns:
            list: Field indices.
        """
        return self.metadata['well_info']['fields']

    def get_dim_order(self):
        """
        Returns the dimension order string.

        Returns:
            str: Dimension order.
        """
        return self.metadata.get('dim_order', 'tczyx')

    def get_dtype(self):
        """
        Returns the numpy dtype of the image data.

        Returns:
            dtype: Numpy dtype.
        """
        return self.metadata.get('dtype')

    def get_pixel_size_um(self):
        """
        Returns the pixel size in micrometers.

        Returns:
            dict: Pixel size for x and y.
        """
        pixel_size = self.metadata['well_info'].get('PixelSizeUm', 1)
        return {'x': pixel_size, 'y': pixel_size}

    def get_position_um(self, well_id=None):
        """
        Returns the position in micrometers for a well.

        Args:
            well_id (str): Well identifier.

        Returns:
            dict: Position in micrometers.
        """
        well = self.metadata['wells'][well_id]
        well_info = self.metadata['well_info']
        x = well.get('CoordX', 0) * well_info['max_sizex_um']
        y = well.get('CoordY', 0) * well_info['max_sizey_um']
        return {'x': x, 'y': y}

    def get_channels(self):
        """
        Returns channel metadata.

        Returns:
            list: List of channel dicts.
        """
        channels = []
        for channel0 in self.metadata['channels']:
            channel = {}
            if 'Dye' in channel0 and channel0['Dye']:
                channel['label'] = channel0['Dye']
            if 'Color' in channel0:
                channel['color'] = hexrgb_to_rgba(channel0['Color'].lstrip('#'))
            channels.append(channel)
        return channels

    def get_nchannels(self):
        """
        Returns the number of channels.

        Returns:
            int: Number of channels.
        """
        return max(self.metadata['num_channels'], 1)

    def get_acquisitions(self):
        """
        Returns acquisition metadata.

        Returns:
            list: List of acquisition dicts.
        """
        acquisitions = []
        for index, acq in enumerate(self.metadata.get('acquisitions', [])):
            acquisitions.append({
                'id': index,
                'name': acq['Name'],
                'description': acq['Description'],
                'date_created': acq['DateCreated'].isoformat(),
                'date_modified': acq['DateModified'].isoformat()
            })
        return acquisitions

    def get_total_data_size(self):
        """
        Returns the estimated total data size.

        Returns:
            int: Total data size in bytes.
        """
        return self.metadata['max_data_size']

    def print_well_matrix(self):
        """
        Returns a string representation of the well matrix.

        Returns:
            str: Well matrix.
        """
        s = ''

        well_info = self.metadata['well_info']
        rows, cols = well_info['rows'], well_info['columns']
        used_wells = [well for well in self.metadata['wells']]

        well_matrix = []
        for row_id in rows:
            row = ''
            for col_id in cols:
                well_id = f'{row_id}{col_id}'
                row += '+' if well_id in used_wells else ' '
            well_matrix.append(row)

        header = ' '.join([pad_leading_zero(col) for col in cols])
        s += ' ' + header + '\n'
        for idx, row in enumerate(well_matrix):
            s += f'{rows[idx]} ' + '  '.join(row) + '\n'
        return s

    def print_timepoint_well_matrix(self):
        """
        Returns a string representation of the timepoint-well matrix.

        Returns:
            str: Timepoint-well matrix.
        """
        s = ''

        time_points = self.metadata['time_points']
        wells = [well for well in self.metadata['wells']]

        well_matrix = []
        for timepoint in time_points:
            wells_at_timepoint = self.db.fetch_all('''
                SELECT DISTINCT Well.Name FROM SourceImageBase
                JOIN Well ON SourceImageBase.ZoneIndex = Well.ZoneIndex
                WHERE TimeSeriesElementId = ?
            ''', (timepoint,), return_dicts=False)

            row = ['+' if well in wells_at_timepoint else ' ' for well in wells]
            well_matrix.append(row)

        header = ' '.join([pad_leading_zero(well) for well in wells])
        s += 'Timepoint ' + header + '\n'
        for idx, row in enumerate(well_matrix):
            s += f'{time_points[idx]:9}  ' + '   '.join(row) + '\n'
        return s

    def close(self):
        """
        Closes the database connection.
        """
        self.db.close()

data = None instance-attribute

data_well_id = None instance-attribute

db = DBReader(self.uri) instance-attribute

__init__(uri, metadata={})

Initialize ImageDbSource.

Parameters:

Name Type Description Default
uri str

Path to the database file.

required
metadata dict

Optional metadata dictionary.

{}
Source code in src\ImageDbSource.py
17
18
19
20
21
22
23
24
25
26
27
28
29
def __init__(self, uri, metadata={}):
    """
    Initialize ImageDbSource.

    Args:
        uri (str): Path to the database file.
        metadata (dict): Optional metadata dictionary.
    """
    super().__init__(uri, metadata)
    self.db = DBReader(self.uri)
    self.data = None
    self.data_well_id = None
    self.metadata['dim_order'] = 'tczyx'

close()

Closes the database connection.

Source code in src\ImageDbSource.py
497
498
499
500
501
def close(self):
    """
    Closes the database connection.
    """
    self.db.close()

get_acquisitions()

Returns acquisition metadata.

Returns:

Name Type Description
list

List of acquisition dicts.

Source code in src\ImageDbSource.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def get_acquisitions(self):
    """
    Returns acquisition metadata.

    Returns:
        list: List of acquisition dicts.
    """
    acquisitions = []
    for index, acq in enumerate(self.metadata.get('acquisitions', [])):
        acquisitions.append({
            'id': index,
            'name': acq['Name'],
            'description': acq['Description'],
            'date_created': acq['DateCreated'].isoformat(),
            'date_modified': acq['DateModified'].isoformat()
        })
    return acquisitions

get_channels()

Returns channel metadata.

Returns:

Name Type Description
list

List of channel dicts.

Source code in src\ImageDbSource.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def get_channels(self):
    """
    Returns channel metadata.

    Returns:
        list: List of channel dicts.
    """
    channels = []
    for channel0 in self.metadata['channels']:
        channel = {}
        if 'Dye' in channel0 and channel0['Dye']:
            channel['label'] = channel0['Dye']
        if 'Color' in channel0:
            channel['color'] = hexrgb_to_rgba(channel0['Color'].lstrip('#'))
        channels.append(channel)
    return channels

get_columns()

Returns the list of column identifiers.

Returns:

Name Type Description
list

Column identifiers.

Source code in src\ImageDbSource.py
308
309
310
311
312
313
314
315
def get_columns(self):
    """
    Returns the list of column identifiers.

    Returns:
        list: Column identifiers.
    """
    return self.metadata['well_info']['columns']

get_data(well_id=None, field_id=None)

Gets image data for a specific well and field.

Parameters:

Name Type Description Default
well_id str

Well identifier.

None
field_id int

Field index.

None

Returns:

Name Type Description
ndarray

Image data.

Source code in src\ImageDbSource.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def get_data(self, well_id=None, field_id=None):
    """
    Gets image data for a specific well and field.

    Args:
        well_id (str, optional): Well identifier.
        field_id (int, optional): Field index.

    Returns:
        ndarray: Image data.
    """
    if well_id != self.data_well_id:
        self._assemble_image_data(self._read_well_info(well_id))
        self.data_well_id = well_id
    return self._extract_site(field_id)

get_dim_order()

Returns the dimension order string.

Returns:

Name Type Description
str

Dimension order.

Source code in src\ImageDbSource.py
344
345
346
347
348
349
350
351
def get_dim_order(self):
    """
    Returns the dimension order string.

    Returns:
        str: Dimension order.
    """
    return self.metadata.get('dim_order', 'tczyx')

get_dtype()

Returns the numpy dtype of the image data.

Returns:

Name Type Description
dtype

Numpy dtype.

Source code in src\ImageDbSource.py
353
354
355
356
357
358
359
360
def get_dtype(self):
    """
    Returns the numpy dtype of the image data.

    Returns:
        dtype: Numpy dtype.
    """
    return self.metadata.get('dtype')

get_fields()

Returns the list of field indices.

Returns:

Name Type Description
list

Field indices.

Source code in src\ImageDbSource.py
335
336
337
338
339
340
341
342
def get_fields(self):
    """
    Returns the list of field indices.

    Returns:
        list: Field indices.
    """
    return self.metadata['well_info']['fields']

get_name()

Gets the experiment or file name.

Returns:

Name Type Description
str

Name.

Source code in src\ImageDbSource.py
287
288
289
290
291
292
293
294
295
296
297
def get_name(self):
    """
    Gets the experiment or file name.

    Returns:
        str: Name.
    """
    name = self.metadata.get('Name')
    if not name:
        name = splitall(os.path.splitext(self.uri)[0])[-2]
    return name

get_nchannels()

Returns the number of channels.

Returns:

Name Type Description
int

Number of channels.

Source code in src\ImageDbSource.py
405
406
407
408
409
410
411
412
def get_nchannels(self):
    """
    Returns the number of channels.

    Returns:
        int: Number of channels.
    """
    return max(self.metadata['num_channels'], 1)

get_pixel_size_um()

Returns the pixel size in micrometers.

Returns:

Name Type Description
dict

Pixel size for x and y.

Source code in src\ImageDbSource.py
362
363
364
365
366
367
368
369
370
def get_pixel_size_um(self):
    """
    Returns the pixel size in micrometers.

    Returns:
        dict: Pixel size for x and y.
    """
    pixel_size = self.metadata['well_info'].get('PixelSizeUm', 1)
    return {'x': pixel_size, 'y': pixel_size}

get_position_um(well_id=None)

Returns the position in micrometers for a well.

Parameters:

Name Type Description Default
well_id str

Well identifier.

None

Returns:

Name Type Description
dict

Position in micrometers.

Source code in src\ImageDbSource.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def get_position_um(self, well_id=None):
    """
    Returns the position in micrometers for a well.

    Args:
        well_id (str): Well identifier.

    Returns:
        dict: Position in micrometers.
    """
    well = self.metadata['wells'][well_id]
    well_info = self.metadata['well_info']
    x = well.get('CoordX', 0) * well_info['max_sizex_um']
    y = well.get('CoordY', 0) * well_info['max_sizey_um']
    return {'x': x, 'y': y}

get_rows()

Returns the list of row identifiers.

Returns:

Name Type Description
list

Row identifiers.

Source code in src\ImageDbSource.py
299
300
301
302
303
304
305
306
def get_rows(self):
    """
    Returns the list of row identifiers.

    Returns:
        list: Row identifiers.
    """
    return self.metadata['well_info']['rows']

get_shape()

Returns the shape of the image data.

Returns:

Name Type Description
tuple

Shape of the image data.

Source code in src\ImageDbSource.py
45
46
47
48
49
50
51
52
def get_shape(self):
    """
    Returns the shape of the image data.

    Returns:
        tuple: Shape of the image data.
    """
    return self.shape

get_time_points()

Returns the list of time points.

Returns:

Name Type Description
list

Time point IDs.

Source code in src\ImageDbSource.py
326
327
328
329
330
331
332
333
def get_time_points(self):
    """
    Returns the list of time points.

    Returns:
        list: Time point IDs.
    """
    return self.metadata['time_points']

get_total_data_size()

Returns the estimated total data size.

Returns:

Name Type Description
int

Total data size in bytes.

Source code in src\ImageDbSource.py
432
433
434
435
436
437
438
439
def get_total_data_size(self):
    """
    Returns the estimated total data size.

    Returns:
        int: Total data size in bytes.
    """
    return self.metadata['max_data_size']

get_wells()

Returns the list of well identifiers.

Returns:

Name Type Description
list

Well identifiers.

Source code in src\ImageDbSource.py
317
318
319
320
321
322
323
324
def get_wells(self):
    """
    Returns the list of well identifiers.

    Returns:
        list: Well identifiers.
    """
    return list(self.metadata['wells'])

init_metadata()

Initializes and loads metadata from the database.

Returns:

Name Type Description
dict

Metadata dictionary.

Source code in src\ImageDbSource.py
31
32
33
34
35
36
37
38
39
40
41
42
43
def init_metadata(self):
    """
    Initializes and loads metadata from the database.

    Returns:
        dict: Metadata dictionary.
    """
    self._get_time_series_info()
    self._get_experiment_metadata()
    self._get_well_info()
    self._get_image_info()
    self._get_sizes()
    return self.metadata

is_screen()

Checks if the source is a screen (has wells).

Returns:

Name Type Description
bool

True if wells exist.

Source code in src\ImageDbSource.py
262
263
264
265
266
267
268
269
def is_screen(self):
    """
    Checks if the source is a screen (has wells).

    Returns:
        bool: True if wells exist.
    """
    return len(self.metadata['wells']) > 0

print_timepoint_well_matrix()

Returns a string representation of the timepoint-well matrix.

Returns:

Name Type Description
str

Timepoint-well matrix.

Source code in src\ImageDbSource.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def print_timepoint_well_matrix(self):
    """
    Returns a string representation of the timepoint-well matrix.

    Returns:
        str: Timepoint-well matrix.
    """
    s = ''

    time_points = self.metadata['time_points']
    wells = [well for well in self.metadata['wells']]

    well_matrix = []
    for timepoint in time_points:
        wells_at_timepoint = self.db.fetch_all('''
            SELECT DISTINCT Well.Name FROM SourceImageBase
            JOIN Well ON SourceImageBase.ZoneIndex = Well.ZoneIndex
            WHERE TimeSeriesElementId = ?
        ''', (timepoint,), return_dicts=False)

        row = ['+' if well in wells_at_timepoint else ' ' for well in wells]
        well_matrix.append(row)

    header = ' '.join([pad_leading_zero(well) for well in wells])
    s += 'Timepoint ' + header + '\n'
    for idx, row in enumerate(well_matrix):
        s += f'{time_points[idx]:9}  ' + '   '.join(row) + '\n'
    return s

print_well_matrix()

Returns a string representation of the well matrix.

Returns:

Name Type Description
str

Well matrix.

Source code in src\ImageDbSource.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def print_well_matrix(self):
    """
    Returns a string representation of the well matrix.

    Returns:
        str: Well matrix.
    """
    s = ''

    well_info = self.metadata['well_info']
    rows, cols = well_info['rows'], well_info['columns']
    used_wells = [well for well in self.metadata['wells']]

    well_matrix = []
    for row_id in rows:
        row = ''
        for col_id in cols:
            well_id = f'{row_id}{col_id}'
            row += '+' if well_id in used_wells else ' '
        well_matrix.append(row)

    header = ' '.join([pad_leading_zero(col) for col in cols])
    s += ' ' + header + '\n'
    for idx, row in enumerate(well_matrix):
        s += f'{rows[idx]} ' + '  '.join(row) + '\n'
    return s

convert_dotnet_ticks_to_datetime(net_ticks)

Source code in src\util.py
66
67
def convert_dotnet_ticks_to_datetime(net_ticks):
    return datetime(1, 1, 1) + timedelta(microseconds=net_ticks // 10)

convert_to_um(value, unit)

Source code in src\util.py
113
114
115
116
117
118
119
120
121
def convert_to_um(value, unit):
    conversions = {
        'nm': 1e-3,
        'µm': 1, 'um': 1, 'micrometer': 1, 'micron': 1,
        'mm': 1e3, 'millimeter': 1e3,
        'cm': 1e4, 'centimeter': 1e4,
        'm': 1e6, 'meter': 1e6
    }
    return value * conversions.get(unit, 1)

ensure_list(item)

Source code in src\util.py
6
7
8
9
def ensure_list(item):
    if not isinstance(item, (list, tuple)):
        item = [item]
    return item

get_filetitle(filename)

Source code in src\util.py
12
13
def get_filetitle(filename):
    return os.path.basename(os.path.splitext(filename)[0])

pad_leading_zero(input_string, num_digits=2)

Source code in src\util.py
48
49
50
51
52
53
54
55
56
57
58
def pad_leading_zero(input_string, num_digits=2):
    output = str(input_string)
    is_well = not output[0].isdigit()
    if is_well:
        row, col = split_well_name(output, remove_leading_zeros=True)
        output = str(col)
    while len(output) < num_digits:
        output = '0' + output
    if is_well:
        output = row + output
    return output

print_dict(value, tab=0, max_len=250, bullet=False)

Source code in src\util.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def print_dict(value, tab=0, max_len=250, bullet=False):
    s = ''
    if isinstance(value, dict):
        for key, subvalue in value.items():
            s += '\n'
            if bullet:
                s += '-'
                bullet = False
            s += '\t' * tab + str(key) + ': '
            if isinstance(subvalue, dict):
                s += print_dict(subvalue, tab+1)
            elif isinstance(subvalue, list):
                for v in subvalue:
                    s += print_dict(v, tab+1, bullet=True)
            else:
                subvalue = str(subvalue)
                if len(subvalue) > max_len:
                    subvalue = subvalue[:max_len] + '...'
                s += subvalue
    else:
        s += str(value) + ' '
    return s

print_hbytes(nbytes)

Source code in src\util.py
148
149
150
151
152
153
154
155
156
157
158
159
def print_hbytes(nbytes):
    exps = ['', 'K', 'M', 'G', 'T', 'P', 'E']
    div = 1024
    exp = 0
    while nbytes > div:
        nbytes /= div
        exp += 1
    if exp < len(exps):
        e = exps[exp]
    else:
        e = f'e{exp * 3}'
    return f'{nbytes:.1f}{e}B'

split_well_name(well_name, remove_leading_zeros=True, col_as_int=False)

Source code in src\util.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def split_well_name(well_name, remove_leading_zeros=True, col_as_int=False):
    matches = re.findall(r'(\D+)(\d+)', well_name)
    if len(matches) > 0:
        row, col = matches[0]
        if col_as_int or remove_leading_zeros:
            try:
                col = int(col)
            except ValueError:
                pass
        if not col_as_int:
            col = str(col)
        return row, col
    else:
        raise ValueError(f"Invalid well name format: {well_name}. Expected format like 'A1', 'B2', etc.")

splitall(path)

Source code in src\util.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def splitall(path):
    allparts = []
    while True:
        parts = os.path.split(path)
        if parts[0] == path:  # sentinel for absolute paths
            allparts.insert(0, parts[0])
            break
        elif parts[1] == path: # sentinel for relative paths
            allparts.insert(0, parts[1])
            break
        else:
            path = parts[0]
            allparts.insert(0, parts[1])
    return allparts

strip_leading_zeros(well_name)

Source code in src\util.py
61
62
63
def strip_leading_zeros(well_name):
    row, col = split_well_name(well_name, remove_leading_zeros=True)
    return f'{row}{col}'

xml_content_to_dict(element)

Source code in src\util.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def xml_content_to_dict(element):
    key = element.tag
    children = list(element)
    if key == 'Array':
        res = [xml_content_to_dict(child) for child in children]
        return res
    if len(children) > 0:
        if children[0].tag == 'Array':
            value = []
        else:
            value = {}
        for child in children:
            child_value = xml_content_to_dict(child)
            if isinstance(child_value, list):
                value.extend(child_value)
            else:
                value |= child_value
    else:
        value = element.text
        if value is not None:
            if '"' in value:
                value = value.replace('"', '')
            else:
                for t in (float, int, bool):
                    try:
                        if t == bool:
                            if value.lower() == 'true':
                                value = True
                            if value.lower() == 'false':
                                value = False
                        else:
                            value = t(value)
                        break
                    except (TypeError, ValueError):
                        pass

    if key == 'DataObject':
        key = element.attrib['ObjectType']
    if key == 'Attribute':
        key = element.attrib['Name']
    return {key: value}

ImageSource

ImageSource

Bases: ABC

Abstract base class for image sources.

Source code in src\ImageSource.py
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class ImageSource(ABC):
    """
    Abstract base class for image sources.
    """

    def __init__(self, uri, metadata={}):
        """
        Initialize ImageSource.

        Args:
            uri (str): Path to the image source.
            metadata (dict): Optional metadata dictionary.
        """
        self.uri = uri
        self.metadata = metadata

    def init_metadata(self):
        """
        Initialize and load metadata.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'init_metadata' method must be implemented by subclasses.")

    def is_screen(self):
        """
        Check if the source is a screen (multi-well).

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'is_screen' method must be implemented by subclasses.")

    def get_shape(self):
        """
        Get the shape of the image data.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_shape' method must be implemented by subclasses.")

    def get_data(self, well_id=None, field_id=None):
        """
        Get image data for a well and field.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_data' method must be implemented by subclasses.")

    def get_name(self):
        """
        Get the name of the image source.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_name' method must be implemented by subclasses.")

    def get_dim_order(self):
        """
        Get the dimension order string.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_dim_order' method must be implemented by subclasses.")

    def get_dtype(self):
        """
        Get the numpy dtype of the image data.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_dtype' method must be implemented by subclasses.")

    def get_pixel_size_um(self):
        """
        Get the pixel size in micrometers.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_pixel_size_um' method must be implemented by subclasses.")

    def get_position_um(self, well_id=None):
        """
        Get the position in micrometers for a well.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_position_um' method must be implemented by subclasses.")

    def get_channels(self):
        """
        Get channel metadata in NGFF format, color provided as RGBA list with values between 0 and 1
        e.g. white = [1, 1, 1, 1]

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_channels' method must be implemented by subclasses.")

    def get_nchannels(self):
        """
        Get the number of channels.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_nchannels' method must be implemented by subclasses.")

    def get_rows(self):
        """
        Get the list of row identifiers.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_rows' method must be implemented by subclasses.")

    def get_columns(self):
        """
        Get the list of column identifiers.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_columns' method must be implemented by subclasses.")

    def get_wells(self):
        """
        Get the list of well identifiers.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_wells' method must be implemented by subclasses.")

    def get_time_points(self):
        """
        Get the list of time points.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_time_points' method must be implemented by subclasses.")

    def get_fields(self):
        """
        Get the list of field indices.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_fields' method must be implemented by subclasses.")

    def get_acquisitions(self):
        """
        Get acquisition metadata.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_acquisitions' method must be implemented by subclasses.")

    def get_total_data_size(self):
        """
        Get the estimated total data size.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_total_data_size' method must be implemented by subclasses.")

    def close(self):
        """
        Close the image source.
        """
        pass

metadata = metadata instance-attribute

uri = uri instance-attribute

__init__(uri, metadata={})

Initialize ImageSource.

Parameters:

Name Type Description Default
uri str

Path to the image source.

required
metadata dict

Optional metadata dictionary.

{}
Source code in src\ImageSource.py
 9
10
11
12
13
14
15
16
17
18
def __init__(self, uri, metadata={}):
    """
    Initialize ImageSource.

    Args:
        uri (str): Path to the image source.
        metadata (dict): Optional metadata dictionary.
    """
    self.uri = uri
    self.metadata = metadata

close()

Close the image source.

Source code in src\ImageSource.py
183
184
185
186
187
def close(self):
    """
    Close the image source.
    """
    pass

get_acquisitions()

Get acquisition metadata.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
165
166
167
168
169
170
171
172
def get_acquisitions(self):
    """
    Get acquisition metadata.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_acquisitions' method must be implemented by subclasses.")

get_channels()

Get channel metadata in NGFF format, color provided as RGBA list with values between 0 and 1 e.g. white = [1, 1, 1, 1]

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
101
102
103
104
105
106
107
108
109
def get_channels(self):
    """
    Get channel metadata in NGFF format, color provided as RGBA list with values between 0 and 1
    e.g. white = [1, 1, 1, 1]

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_channels' method must be implemented by subclasses.")

get_columns()

Get the list of column identifiers.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
129
130
131
132
133
134
135
136
def get_columns(self):
    """
    Get the list of column identifiers.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_columns' method must be implemented by subclasses.")

get_data(well_id=None, field_id=None)

Get image data for a well and field.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
47
48
49
50
51
52
53
54
def get_data(self, well_id=None, field_id=None):
    """
    Get image data for a well and field.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_data' method must be implemented by subclasses.")

get_dim_order()

Get the dimension order string.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
65
66
67
68
69
70
71
72
def get_dim_order(self):
    """
    Get the dimension order string.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_dim_order' method must be implemented by subclasses.")

get_dtype()

Get the numpy dtype of the image data.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
74
75
76
77
78
79
80
81
def get_dtype(self):
    """
    Get the numpy dtype of the image data.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_dtype' method must be implemented by subclasses.")

get_fields()

Get the list of field indices.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
156
157
158
159
160
161
162
163
def get_fields(self):
    """
    Get the list of field indices.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_fields' method must be implemented by subclasses.")

get_name()

Get the name of the image source.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
56
57
58
59
60
61
62
63
def get_name(self):
    """
    Get the name of the image source.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_name' method must be implemented by subclasses.")

get_nchannels()

Get the number of channels.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
111
112
113
114
115
116
117
118
def get_nchannels(self):
    """
    Get the number of channels.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_nchannels' method must be implemented by subclasses.")

get_pixel_size_um()

Get the pixel size in micrometers.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
83
84
85
86
87
88
89
90
def get_pixel_size_um(self):
    """
    Get the pixel size in micrometers.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_pixel_size_um' method must be implemented by subclasses.")

get_position_um(well_id=None)

Get the position in micrometers for a well.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
92
93
94
95
96
97
98
99
def get_position_um(self, well_id=None):
    """
    Get the position in micrometers for a well.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_position_um' method must be implemented by subclasses.")

get_rows()

Get the list of row identifiers.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
120
121
122
123
124
125
126
127
def get_rows(self):
    """
    Get the list of row identifiers.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_rows' method must be implemented by subclasses.")

get_shape()

Get the shape of the image data.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
38
39
40
41
42
43
44
45
def get_shape(self):
    """
    Get the shape of the image data.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_shape' method must be implemented by subclasses.")

get_time_points()

Get the list of time points.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
147
148
149
150
151
152
153
154
def get_time_points(self):
    """
    Get the list of time points.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_time_points' method must be implemented by subclasses.")

get_total_data_size()

Get the estimated total data size.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
174
175
176
177
178
179
180
181
def get_total_data_size(self):
    """
    Get the estimated total data size.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_total_data_size' method must be implemented by subclasses.")

get_wells()

Get the list of well identifiers.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
138
139
140
141
142
143
144
145
def get_wells(self):
    """
    Get the list of well identifiers.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_wells' method must be implemented by subclasses.")

init_metadata()

Initialize and load metadata.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
20
21
22
23
24
25
26
27
def init_metadata(self):
    """
    Initialize and load metadata.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'init_metadata' method must be implemented by subclasses.")

is_screen()

Check if the source is a screen (multi-well).

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src\ImageSource.py
29
30
31
32
33
34
35
36
def is_screen(self):
    """
    Check if the source is a screen (multi-well).

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'is_screen' method must be implemented by subclasses.")

OmeTiffWriter

OmeTiffWriter

Bases: OmeWriter

Writes image data and metadata to OME-TIFF files.

Source code in src\OmeTiffWriter.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class OmeTiffWriter(OmeWriter):
    """
    Writes image data and metadata to OME-TIFF files.
    """
    def __init__(self, verbose=False):
        """
        Initialize OmeTiffWriter.

        Args:
            verbose (bool): If True, prints progress info.
        """
        super().__init__()
        self.verbose = verbose

    def write(self, filepath, source, **kwargs):
        """
        Writes image or screen data to OME-TIFF files.

        Args:
            filepath (str): Output file path.
            source (ImageSource): Source object.
            **kwargs: Additional options.

        Returns:
            str or list: Output file path(s).
        """
        if source.is_screen():
            filepath, total_size = self._write_screen(filepath, source, **kwargs)
        else:
            filepath, total_size = self._write_image(filepath, source, **kwargs)

        if self.verbose:
            print(f'Total data written: {print_hbytes(total_size)}')

        return filepath

    def _write_screen(self, filename, source, **kwargs):
        """
        Writes multi-well screen data to separate TIFF files and companion metadata.

        Args:
            filename (str): Output file name.
            source (ImageSource): Source object.
            **kwargs: Additional options.

        Returns:
            tuple: (List of output paths, total data size)
        """
        # writes separate tiff files for each field, and separate metadata companion file
        output_paths = []
        filepath, filename = os.path.split(filename)
        filetitle = os.path.splitext(filename)[0].rstrip('.ome')

        companion_filename = os.path.join(filepath, filetitle + '.companion.ome')
        companion_uuid = create_uuid()

        total_size = 0
        image_uuids = []
        image_filenames = []
        for well_id in source.get_wells():
            for field in source.get_fields():
                resolution, resolution_unit = create_resolution_metadata(source)
                data = source.get_data(well_id, field)

                filename = f'{filetitle}'
                filename += f'_{pad_leading_zero(well_id)}'
                if field is not None:
                    filename += f'_{pad_leading_zero(field)}'
                filename = os.path.join(filepath, filename + '.ome.tiff')
                xml_metadata, image_uuid = create_binaryonly_metadata(os.path.basename(companion_filename), companion_uuid)

                size = self._write_tiff(filename, source, data,
                                        resolution=resolution, resolution_unit=resolution_unit,
                                        tile_size=kwargs.get('tile_size'), compression=kwargs.get('compression'),
                                        xml_metadata=xml_metadata, pyramid_levels=4)

                image_uuids.append(image_uuid)
                image_filenames.append(os.path.basename(filename))
                output_paths.append(filename)
                total_size += size

        xml_metadata = create_metadata(source, companion_uuid, image_uuids, image_filenames)
        with open(companion_filename, 'wb') as file:
            file.write(xml_metadata.encode())

        output_paths = [companion_filename] + output_paths
        return output_paths, total_size

    def _write_image(self, filename, source, **kwargs):
        """
        Writes single image data to a TIFF file.

        Args:
            filename (str): Output file name.
            source (ImageSource): Source object.
            **kwargs: Additional options.

        Returns:
            tuple: (Output path, data size)
        """
        xml_metadata, _ = create_metadata(source)
        resolution, resolution_unit = create_resolution_metadata(source)
        data = source.get_data()

        size = self._write_tiff(filename, source, data,
                                resolution=resolution, resolution_unit=resolution_unit,
                                tile_size=kwargs.get('tile_size'), compression=kwargs.get('compression'),
                                xml_metadata=xml_metadata, pyramid_levels=4)

        return filename, size

    def _write_tiff(self, filename, source, data,
                  resolution=None, resolution_unit=None, tile_size=None, compression=None,
                  xml_metadata=None, pyramid_levels=0, pyramid_scale=2):
        """
        Writes image data to a TIFF file with optional pyramids and metadata.

        Args:
            filename (str): Output file name.
            source (ImageSource): Source object.
            data (ndarray): Image data.
            resolution (tuple, optional): Pixel resolution.
            resolution_unit (str, optional): Resolution unit.
            tile_size (int or tuple, optional): Tile size.
            compression (str, optional): Compression type.
            xml_metadata (str, optional): OME-XML metadata.
            pyramid_levels (int): Number of pyramid levels.
            pyramid_scale (int): Pyramid downscale factor.

        Returns:
            int: Data size in bytes.
        """
        dim_order = source.get_dim_order()
        shape = data.shape
        x_index = dim_order.index('x')
        y_index = dim_order.index('y')
        size = shape[x_index], shape[y_index]
        source_type = source.get_dtype()

        if tile_size is not None and isinstance(tile_size, int):
            tile_size = [tile_size] * 2

        if resolution is not None:
            # tifffile only supports x/y pyramid resolution
            resolution = tuple(resolution[0:2])

        if xml_metadata is not None:
            # set ome=False to provide custom OME xml in description
            xml_metadata_bytes = xml_metadata.encode()
            is_ome = False
        else:
            xml_metadata_bytes = None
            is_ome = True

        # maximum size (w/o compression)
        max_size = data.size * data.itemsize
        base_size = np.divide(max_size, np.prod(size))
        scale = 1
        for level in range(pyramid_levels):
            max_size += np.prod(size) * scale * base_size
            scale /= pyramid_scale
        bigtiff = (max_size > 2 ** 32)

        size = data.size * data.itemsize
        with TiffWriter(filename, bigtiff=bigtiff, ome=is_ome) as writer:
            for level in range(pyramid_levels + 1):
                if level == 0:
                    scale = 1
                    subifds = pyramid_levels
                    subfiletype = None
                else:
                    scale /= pyramid_scale
                    new_shape = list(shape)
                    new_shape[x_index] = int(shape[x_index] * scale)
                    new_shape[y_index] = int(shape[y_index] * scale)
                    data = resize(data, new_shape, preserve_range=True)
                    subifds = None
                    subfiletype = 1
                    xml_metadata_bytes = None
                writer.write(data.astype(source_type), subifds=subifds, subfiletype=subfiletype,
                             resolution=resolution, resolutionunit=resolution_unit, tile=tile_size,
                             compression=compression, description=xml_metadata_bytes)
        return size

verbose = verbose instance-attribute

__init__(verbose=False)

Initialize OmeTiffWriter.

Parameters:

Name Type Description Default
verbose bool

If True, prints progress info.

False
Source code in src\OmeTiffWriter.py
15
16
17
18
19
20
21
22
23
def __init__(self, verbose=False):
    """
    Initialize OmeTiffWriter.

    Args:
        verbose (bool): If True, prints progress info.
    """
    super().__init__()
    self.verbose = verbose

write(filepath, source, **kwargs)

Writes image or screen data to OME-TIFF files.

Parameters:

Name Type Description Default
filepath str

Output file path.

required
source ImageSource

Source object.

required
**kwargs

Additional options.

{}

Returns:

Type Description

str or list: Output file path(s).

Source code in src\OmeTiffWriter.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def write(self, filepath, source, **kwargs):
    """
    Writes image or screen data to OME-TIFF files.

    Args:
        filepath (str): Output file path.
        source (ImageSource): Source object.
        **kwargs: Additional options.

    Returns:
        str or list: Output file path(s).
    """
    if source.is_screen():
        filepath, total_size = self._write_screen(filepath, source, **kwargs)
    else:
        filepath, total_size = self._write_image(filepath, source, **kwargs)

    if self.verbose:
        print(f'Total data written: {print_hbytes(total_size)}')

    return filepath

convert_dotnet_ticks_to_datetime(net_ticks)

Source code in src\util.py
66
67
def convert_dotnet_ticks_to_datetime(net_ticks):
    return datetime(1, 1, 1) + timedelta(microseconds=net_ticks // 10)

convert_to_um(value, unit)

Source code in src\util.py
113
114
115
116
117
118
119
120
121
def convert_to_um(value, unit):
    conversions = {
        'nm': 1e-3,
        'µm': 1, 'um': 1, 'micrometer': 1, 'micron': 1,
        'mm': 1e3, 'millimeter': 1e3,
        'cm': 1e4, 'centimeter': 1e4,
        'm': 1e6, 'meter': 1e6
    }
    return value * conversions.get(unit, 1)

ensure_list(item)

Source code in src\util.py
6
7
8
9
def ensure_list(item):
    if not isinstance(item, (list, tuple)):
        item = [item]
    return item

get_filetitle(filename)

Source code in src\util.py
12
13
def get_filetitle(filename):
    return os.path.basename(os.path.splitext(filename)[0])

pad_leading_zero(input_string, num_digits=2)

Source code in src\util.py
48
49
50
51
52
53
54
55
56
57
58
def pad_leading_zero(input_string, num_digits=2):
    output = str(input_string)
    is_well = not output[0].isdigit()
    if is_well:
        row, col = split_well_name(output, remove_leading_zeros=True)
        output = str(col)
    while len(output) < num_digits:
        output = '0' + output
    if is_well:
        output = row + output
    return output

print_dict(value, tab=0, max_len=250, bullet=False)

Source code in src\util.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def print_dict(value, tab=0, max_len=250, bullet=False):
    s = ''
    if isinstance(value, dict):
        for key, subvalue in value.items():
            s += '\n'
            if bullet:
                s += '-'
                bullet = False
            s += '\t' * tab + str(key) + ': '
            if isinstance(subvalue, dict):
                s += print_dict(subvalue, tab+1)
            elif isinstance(subvalue, list):
                for v in subvalue:
                    s += print_dict(v, tab+1, bullet=True)
            else:
                subvalue = str(subvalue)
                if len(subvalue) > max_len:
                    subvalue = subvalue[:max_len] + '...'
                s += subvalue
    else:
        s += str(value) + ' '
    return s

print_hbytes(nbytes)

Source code in src\util.py
148
149
150
151
152
153
154
155
156
157
158
159
def print_hbytes(nbytes):
    exps = ['', 'K', 'M', 'G', 'T', 'P', 'E']
    div = 1024
    exp = 0
    while nbytes > div:
        nbytes /= div
        exp += 1
    if exp < len(exps):
        e = exps[exp]
    else:
        e = f'e{exp * 3}'
    return f'{nbytes:.1f}{e}B'

split_well_name(well_name, remove_leading_zeros=True, col_as_int=False)

Source code in src\util.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def split_well_name(well_name, remove_leading_zeros=True, col_as_int=False):
    matches = re.findall(r'(\D+)(\d+)', well_name)
    if len(matches) > 0:
        row, col = matches[0]
        if col_as_int or remove_leading_zeros:
            try:
                col = int(col)
            except ValueError:
                pass
        if not col_as_int:
            col = str(col)
        return row, col
    else:
        raise ValueError(f"Invalid well name format: {well_name}. Expected format like 'A1', 'B2', etc.")

splitall(path)

Source code in src\util.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def splitall(path):
    allparts = []
    while True:
        parts = os.path.split(path)
        if parts[0] == path:  # sentinel for absolute paths
            allparts.insert(0, parts[0])
            break
        elif parts[1] == path: # sentinel for relative paths
            allparts.insert(0, parts[1])
            break
        else:
            path = parts[0]
            allparts.insert(0, parts[1])
    return allparts

strip_leading_zeros(well_name)

Source code in src\util.py
61
62
63
def strip_leading_zeros(well_name):
    row, col = split_well_name(well_name, remove_leading_zeros=True)
    return f'{row}{col}'

xml_content_to_dict(element)

Source code in src\util.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def xml_content_to_dict(element):
    key = element.tag
    children = list(element)
    if key == 'Array':
        res = [xml_content_to_dict(child) for child in children]
        return res
    if len(children) > 0:
        if children[0].tag == 'Array':
            value = []
        else:
            value = {}
        for child in children:
            child_value = xml_content_to_dict(child)
            if isinstance(child_value, list):
                value.extend(child_value)
            else:
                value |= child_value
    else:
        value = element.text
        if value is not None:
            if '"' in value:
                value = value.replace('"', '')
            else:
                for t in (float, int, bool):
                    try:
                        if t == bool:
                            if value.lower() == 'true':
                                value = True
                            if value.lower() == 'false':
                                value = False
                        else:
                            value = t(value)
                        break
                    except (TypeError, ValueError):
                        pass

    if key == 'DataObject':
        key = element.attrib['ObjectType']
    if key == 'Attribute':
        key = element.attrib['Name']
    return {key: value}

OmeWriter

OmeWriter

Bases: ABC

Abstract base class for OME writers.

Source code in src\OmeWriter.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class OmeWriter(ABC):
    """
    Abstract base class for OME writers.
    """

    def write(self, filepath, source, verbose=False, **kwargs) -> str:
        """
        Write image data and metadata to output.

        Args:
            filepath (str): Output file path.
            source (ImageSource): Source object.
            verbose (bool): If True, prints progress info.
            **kwargs: Additional options.

        Returns:
            str: Output file path(s).
        """
        # Expect to return output path (or filepath)
        raise NotImplementedError("This method should be implemented by subclasses.")

write(filepath, source, verbose=False, **kwargs)

Write image data and metadata to output.

Parameters:

Name Type Description Default
filepath str

Output file path.

required
source ImageSource

Source object.

required
verbose bool

If True, prints progress info.

False
**kwargs

Additional options.

{}

Returns:

Name Type Description
str str

Output file path(s).

Source code in src\OmeWriter.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def write(self, filepath, source, verbose=False, **kwargs) -> str:
    """
    Write image data and metadata to output.

    Args:
        filepath (str): Output file path.
        source (ImageSource): Source object.
        verbose (bool): If True, prints progress info.
        **kwargs: Additional options.

    Returns:
        str: Output file path(s).
    """
    # Expect to return output path (or filepath)
    raise NotImplementedError("This method should be implemented by subclasses.")

OmeZarrWriter

OmeZarrWriter

Bases: OmeWriter

Source code in src\OmeZarrWriter.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class OmeZarrWriter(OmeWriter):
    def __init__(self, zarr_version=2, ome_version='0.4', verbose=False):
        super().__init__()
        self.zarr_version = zarr_version
        self.ome_version = ome_version
        if ome_version == '0.4':
            from ome_zarr.format import FormatV04
            self.ome_format = FormatV04()
        elif ome_version == '0.5':
            from ome_zarr.format import FormatV05
            self.ome_format = FormatV05()
        else:
            self.ome_format = None
        self.verbose = verbose

    def write(self, filepath, source, **kwargs):
        if source.is_screen():
            zarr_root, total_size = self._write_screen(filepath, source, **kwargs)
        else:
            zarr_root, total_size = self._write_image(filepath, source)

        zarr_root.attrs['_creator'] = {'name': 'nl.biomero.OmeZarrWriter', 'version': VERSION}

        if self.verbose:
            print(f'Total data written: {print_hbytes(total_size)}')

        return filepath

    def _write_screen(self, filepath, source, **kwargs):
        #zarr_location = parse_url(filename, mode='w', fmt=self.ome_format)
        zarr_location = filepath
        zarr_root = zarr.open_group(zarr_location, mode='w', zarr_version=self.zarr_version)

        row_names = source.get_rows()
        col_names = source.get_columns()
        wells = source.get_wells()
        well_paths = ['/'.join(split_well_name(well)) for well in wells]
        fields = list(map(str, source.get_fields()))

        acquisitions = source.get_acquisitions()
        name = source.get_name()
        write_plate_metadata(zarr_root, row_names, col_names, well_paths,
                             name=name, field_count=len(fields), acquisitions=acquisitions,
                             fmt=self.ome_format)
        total_size = 0
        for well_id in wells:
            row, col = split_well_name(well_id)
            row_group = zarr_root.require_group(str(row))
            well_group = row_group.require_group(str(col))
            write_well_metadata(well_group, fields, fmt=self.ome_format)
            position = source.get_position_um(well_id)
            for field in fields:
                image_group = well_group.require_group(field)
                data = source.get_data(well_id, field)
                size = self._write_data(image_group, data, source, position)
                total_size += size

        return zarr_root, total_size

    def _write_image(self, filepath, source):
        #zarr_location = parse_url(filename, mode='w', fmt=self.ome_format)
        zarr_location = filepath
        zarr_root = zarr.open_group(zarr_location, mode='w', zarr_version=self.zarr_version)

        data = source.get_data()
        size = self._write_data(zarr_root, data, source)
        return zarr_root, size

    def _write_data(self, group, data, source, position=None):
        dim_order = source.get_dim_order()
        if dim_order[-1] == 'c':
            dim_order = 'c' + dim_order[:-1]
            data = np.moveaxis(data, -1, 0)
        axes = create_axes_metadata(dim_order)
        pixel_size_scales, scaler = self._create_scale_metadata(source, dim_order, position)

        if self.zarr_version >= 3:
            shards = []
            chunks = []
            # TODO: don't redefine chunks for dask/+ arrays
            for dim, n in zip(dim_order, data.shape):
                if dim in 'xy':
                    shards += [10240]
                    chunks += [1024]
                else:
                    shards += [1]
                    chunks += [1]
            storage_options = {'chunks': chunks, 'shards': shards}
        else:
            storage_options = None

        size = data.size * data.itemsize
        write_image(image=data, group=group, axes=axes, coordinate_transformations=pixel_size_scales,
                    scaler=scaler, fmt=self.ome_format, storage_options=storage_options)

        dtype = source.get_dtype()
        channels = source.get_channels()
        nchannels = source.get_nchannels()
        window_scanner = WindowScanner()
        window_scanner.process(data, dim_order)
        window = window_scanner.get_window()
        group.attrs['omero'] = create_channel_metadata(dtype, channels, nchannels, window, self.ome_version)
        return size

    def _create_scale_metadata(self, source, dim_order, translation, scaler=None):
        if scaler is None:
            scaler = Scaler()
        pixel_size_scales = []
        scale = 1
        for i in range(scaler.max_layer + 1):
            pixel_size_scales.append(
                create_transformation_metadata(dim_order, source.get_pixel_size_um(),
                                               scale, translation))
            scale /= scaler.downscale
        return pixel_size_scales, scaler

ome_format = FormatV04() instance-attribute

ome_version = ome_version instance-attribute

verbose = verbose instance-attribute

zarr_version = zarr_version instance-attribute

__init__(zarr_version=2, ome_version='0.4', verbose=False)

Source code in src\OmeZarrWriter.py
16
17
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, zarr_version=2, ome_version='0.4', verbose=False):
    super().__init__()
    self.zarr_version = zarr_version
    self.ome_version = ome_version
    if ome_version == '0.4':
        from ome_zarr.format import FormatV04
        self.ome_format = FormatV04()
    elif ome_version == '0.5':
        from ome_zarr.format import FormatV05
        self.ome_format = FormatV05()
    else:
        self.ome_format = None
    self.verbose = verbose

write(filepath, source, **kwargs)

Source code in src\OmeZarrWriter.py
30
31
32
33
34
35
36
37
38
39
40
41
def write(self, filepath, source, **kwargs):
    if source.is_screen():
        zarr_root, total_size = self._write_screen(filepath, source, **kwargs)
    else:
        zarr_root, total_size = self._write_image(filepath, source)

    zarr_root.attrs['_creator'] = {'name': 'nl.biomero.OmeZarrWriter', 'version': VERSION}

    if self.verbose:
        print(f'Total data written: {print_hbytes(total_size)}')

    return filepath

create_axes_metadata(dimension_order)

Create axes metadata for OME-Zarr from dimension order.

Parameters:

Name Type Description Default
dimension_order str

String of dimension characters.

required

Returns:

Name Type Description
list

List of axis metadata dictionaries.

Source code in src\ome_zarr_util.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def create_axes_metadata(dimension_order):
    """
    Create axes metadata for OME-Zarr from dimension order.

    Args:
        dimension_order (str): String of dimension characters.

    Returns:
        list: List of axis metadata dictionaries.
    """
    axes = []
    for dimension in dimension_order:
        unit1 = None
        if dimension == 't':
            type1 = 'time'
            unit1 = 'millisecond'
        elif dimension == 'c':
            type1 = 'channel'
        else:
            type1 = 'space'
            unit1 = 'micrometer'
        axis = {'name': dimension, 'type': type1}
        if unit1 is not None and unit1 != '':
            axis['unit'] = unit1
        axes.append(axis)
    return axes

create_channel_metadata(dtype, channels, nchannels, window, ome_version)

Create channel metadata for OME-Zarr.

Parameters:

Name Type Description Default
dtype

Numpy dtype of image data.

required
channels list

List of channel dicts.

required
nchannels int

Number of channels.

required
window tuple

Min/max window values.

required
ome_version str

OME-Zarr version.

required

Returns:

Name Type Description
dict

Channel metadata dictionary.

Source code in src\ome_zarr_util.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def create_channel_metadata(dtype, channels, nchannels, window, ome_version):
    """
    Create channel metadata for OME-Zarr.

    Args:
        dtype: Numpy dtype of image data.
        channels (list): List of channel dicts.
        nchannels (int): Number of channels.
        window (tuple): Min/max window values.
        ome_version (str): OME-Zarr version.

    Returns:
        dict: Channel metadata dictionary.
    """
    if len(channels) < nchannels:
        labels = []
        colors = []
        if nchannels in (3, 4):
            labels = ['Red', 'Green', 'Blue']
            colors = ["FF0000", "00FF00", "0000FF"]
        if nchannels == 4:
            labels += ['Alpha']
            colors += ["FFFFFF"]
        channels = [{'label': label, 'color': color} for label, color in zip(labels, colors)]

    omezarr_channels = []
    start, end = window
    for channeli, channel in enumerate(channels):
        omezarr_channel = {'label': channel.get('label', channel.get('Name', f'{channeli}')), 'active': True}
        color = channel.get('color', channel.get('Color'))
        if color is not None:
            omezarr_channel['color'] = rgba_to_hexrgb(color)
        if dtype.kind == 'f':
            min, max = 0, 1
        else:
            info = np.iinfo(dtype)
            min, max = info.min, info.max
        omezarr_channel['window'] = {'min': min, 'max': max, 'start': start[channeli], 'end': end[channeli]}
        omezarr_channels.append(omezarr_channel)

    metadata = {
        'version': ome_version,
        'channels': omezarr_channels,
    }
    return metadata

create_transformation_metadata(dimension_order, pixel_size_um, scale, translation_um={})

Create transformation metadata (scale and translation) for OME-Zarr.

Parameters:

Name Type Description Default
dimension_order str

String of dimension characters.

required
pixel_size_um dict

Pixel size in micrometers per dimension.

required
scale float

Scaling factor.

required
translation_um dict

Translation in micrometers per dimension.

{}

Returns:

Name Type Description
list

List of transformation metadata dictionaries.

Source code in src\ome_zarr_util.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def create_transformation_metadata(dimension_order, pixel_size_um, scale, translation_um={}):
    """
    Create transformation metadata (scale and translation) for OME-Zarr.

    Args:
        dimension_order (str): String of dimension characters.
        pixel_size_um (dict): Pixel size in micrometers per dimension.
        scale (float): Scaling factor.
        translation_um (dict, optional): Translation in micrometers per dimension.

    Returns:
        list: List of transformation metadata dictionaries.
    """
    metadata = []
    pixel_size_scale = []
    translation_scale = []
    for dimension in dimension_order:
        pixel_size_scale1 = pixel_size_um.get(dimension, 1)
        if pixel_size_scale1 == 0:
            pixel_size_scale1 = 1
        if dimension in ['x', 'y']:
            pixel_size_scale1 /= scale
        pixel_size_scale.append(pixel_size_scale1)

        translation1 = translation_um.get(dimension, 0)
        if dimension in ['x', 'y']:
            translation1 *= scale
        translation_scale.append(translation1)

    metadata.append({'type': 'scale', 'scale': pixel_size_scale})
    if not all(v == 0 for v in translation_scale):
        metadata.append({'type': 'translation', 'translation': translation_scale})
    return metadata

rgba_to_hexrgb(rgba)

Source code in src\color_conversion.py
16
17
18
def rgba_to_hexrgb(rgba: list) -> str:
    hexrgb = ''.join([hex(int(x * 255))[2:].upper().zfill(2) for x in rgba[:3]])
    return hexrgb

scale_dimensions_dict(shape0, scale)

Scale x and y dimensions in a shape dictionary.

Parameters:

Name Type Description Default
shape0 dict

Original shape dictionary.

required
scale float

Scaling factor.

required

Returns:

Name Type Description
dict

Scaled shape dictionary.

Source code in src\ome_zarr_util.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def scale_dimensions_dict(shape0, scale):
    """
    Scale x and y dimensions in a shape dictionary.

    Args:
        shape0 (dict): Original shape dictionary.
        scale (float): Scaling factor.

    Returns:
        dict: Scaled shape dictionary.
    """
    shape = {}
    if scale == 1:
        return shape0
    for dimension, shape1 in shape0.items():
        if dimension[0] in ['x', 'y']:
            shape1 = int(shape1 * scale)
        shape[dimension] = shape1
    return shape

scale_dimensions_xy(shape0, dimension_order, scale)

Scale x and y dimensions in a shape tuple.

Parameters:

Name Type Description Default
shape0 tuple

Original shape.

required
dimension_order str

String of dimension characters.

required
scale float

Scaling factor.

required

Returns:

Name Type Description
list

Scaled shape.

Source code in src\ome_zarr_util.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def scale_dimensions_xy(shape0, dimension_order, scale):
    """
    Scale x and y dimensions in a shape tuple.

    Args:
        shape0 (tuple): Original shape.
        dimension_order (str): String of dimension characters.
        scale (float): Scaling factor.

    Returns:
        list: Scaled shape.
    """
    shape = []
    if scale == 1:
        return shape0
    for shape1, dimension in zip(shape0, dimension_order):
        if dimension[0] in ['x', 'y']:
            shape1 = int(shape1 * scale)
        shape.append(shape1)
    return shape

TiffSource

TiffSource

Bases: ImageSource

Loads image and metadata from TIFF or OME-TIFF files.

Source code in src\TiffSource.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
class TiffSource(ImageSource):
    """
    Loads image and metadata from TIFF or OME-TIFF files.
    """
    def __init__(self, uri, metadata={}):
        """
        Initialize TiffSource.

        Args:
            uri (str): Path to the TIFF file.
            metadata (dict): Optional metadata dictionary.
        """
        super().__init__(uri, metadata)
        image_filename = None
        ext = os.path.splitext(uri)[1].lower()
        if 'tif' in ext:
            image_filename = uri
        elif 'ome' in ext:
            # read metadata
            with open(uri, 'rb') as file:
                self.metadata = metadata_to_dict(file.read().decode())
            # try to open first ome-tiff file
            filename = ensure_list(self.metadata.get('Image', {}))[0].get('Pixels', {}).get('TiffData', {}).get('UUID', {}).get('FileName')
            if filename:
                image_filename = os.path.join(os.path.dirname(uri), filename)
        else:
            raise RuntimeError(f'Unsupported tiff extension: {ext}')

        self.tiff = TiffFile(image_filename)

    def init_metadata(self):
        """
        Initializes and loads metadata from the (OME) TIFF file.

        Returns:
            dict: Metadata dictionary.
        """
        self.is_ome = self.tiff.is_ome
        self.is_imagej = self.tiff.is_imagej
        pixel_size = {'x': 1, 'y': 1}
        position = {}
        channels = []
        if self.is_ome:
            metadata = metadata_to_dict(self.tiff.ome_metadata)
            if metadata and not 'BinaryOnly' in metadata:
                self.metadata = metadata
            self.is_plate = 'Plate' in self.metadata
            if self.is_plate:
                plate = self.metadata['Plate']
                self.name = plate.get('Name')
                rows = set()
                columns = set()
                wells = {}
                for well in plate['Well']:
                    row = create_col_row_label(well['Row'], plate['RowNamingConvention'])
                    column = create_col_row_label(well['Column'], plate['ColumnNamingConvention'])
                    rows.add(row)
                    columns.add(column)
                    label = f'{row}{column}'
                    wells[label] = well['ID']
                self.rows = sorted(rows)
                self.columns = list(columns)
                self.wells = list(wells.keys())
            else:
                self.name = self.metadata['Image'].get('Name')
            pixels = ensure_list(self.metadata.get('Image', []))[0].get('Pixels', {})
            self.shape = pixels.get('SizeT'), pixels.get('SizeC'), pixels.get('SizeZ'), pixels.get('SizeY'), pixels.get('SizeX')
            self.dim_order = ''.join(reversed(pixels['DimensionOrder'].lower()))
            self.dtype = np.dtype(pixels['Type'])
            if 'PhysicalSizeX' in pixels:
                pixel_size['x'] = convert_to_um(float(pixels.get('PhysicalSizeX')), pixels.get('PhysicalSizeXUnit'))
            if 'PhysicalSizeY' in pixels:
                pixel_size['y'] = convert_to_um(float(pixels.get('PhysicalSizeY')), pixels.get('PhysicalSizeYUnit'))
            if 'PhysicalSizeZ' in pixels:
                pixel_size['z'] = convert_to_um(float(pixels.get('PhysicalSizeZ')), pixels.get('PhysicalSizeZUnit'))
            plane = pixels.get('Plane')
            if plane:
                if 'PositionX' in plane:
                    position['x'] = convert_to_um(float(plane.get('PositionX')), plane.get('PositionXUnit'))
                if 'PositionY' in plane:
                    position['y'] = convert_to_um(float(plane.get('PositionY')), plane.get('PositionYUnit'))
                if 'PositionZ' in plane:
                    position['z'] = convert_to_um(float(plane.get('PositionZ')), plane.get('PositionZUnit'))
            for channel0 in ensure_list(pixels.get('Channel')):
                channel = {}
                if 'Name' in channel0:
                    channel['label'] = channel0['Name']
                if 'Color' in channel0:
                    channel['color'] = int_to_rgba(channel0['Color'])
                channels.append(channel)
        else:
            self.is_plate = False
            if self.is_imagej:
                self.imagej_metadata = self.tiff.imagej_metadata
                pixel_size_unit = self.imagej_metadata.get('unit', '').encode().decode('unicode_escape')
                if 'scales' in self.imagej_metadata:
                    for dim, scale in zip(['x', 'y'], self.imagej_metadata['scales'].split(',')):
                        scale = scale.strip()
                        if scale != '':
                            pixel_size[dim] = convert_to_um(float(scale), pixel_size_unit)
                if 'spacing' in self.imagej_metadata:
                    pixel_size['z'] = convert_to_um(self.imagej_metadata['spacing'], pixel_size_unit)
            self.metadata = tags_to_dict(self.tiff.pages.first.tags)
            self.name = os.path.splitext(self.tiff.filename)[0]
            if self.tiff.series:
                page = self.tiff.series[0]
            else:
                page = self.tiff.pages.first
            self.shape = page.shape
            while len(self.shape) < 5:
                self.shape = tuple([1] + list(self.shape))
            self.dim_order = page.axes.lower().replace('s', 'c').replace('r', '')
            self.dtype = page.dtype
            res_unit = self.metadata.get('ResolutionUnit', '')
            if isinstance(res_unit, Enum):
                res_unit = res_unit.name
            res_unit = res_unit.lower()
            if res_unit == 'none':
                res_unit = ''
            if 'x' not in pixel_size:
                res0 = convert_rational_value(self.metadata.get('XResolution'))
                if res0 is not None and res0 != 0:
                    pixel_size['x'] = convert_to_um(1 / res0, res_unit)
            if 'y' not in pixel_size:
                res0 = convert_rational_value(self.metadata.get('YResolution'))
                if res0 is not None and res0 != 0:
                    pixel_size['y'] = convert_to_um(1 / res0, res_unit)
        self.pixel_size = pixel_size
        self.position = position
        self.channels = channels
        return self.metadata

    def is_screen(self):
        """
        Checks if the source is a plate/screen.

        Returns:
            bool: True if plate/screen.
        """
        return self.is_plate

    def get_shape(self):
        """
        Returns the shape of the image data.

        Returns:
            tuple: Shape of the image data.
        """
        return self.shape

    def get_data(self, well_id=None, field_id=None):
        """
        Gets image data from the TIFF file.

        Returns:
            ndarray: Image data.
        """
        data = self.tiff.asarray()
        while data.ndim < len(self.dim_order):
            data = np.expand_dims(data, 0)
        return data

    def get_name(self):
        """
        Gets the image or plate name.

        Returns:
            str: Name.
        """
        return self.name

    def get_dim_order(self):
        """
        Returns the dimension order string.

        Returns:
            str: Dimension order.
        """
        return self.dim_order

    def get_dtype(self):
        """
        Returns the numpy dtype of the image data.

        Returns:
            dtype: Numpy dtype.
        """
        return self.dtype

    def get_pixel_size_um(self):
        """
        Returns the pixel size in micrometers.

        Returns:
            dict: Pixel size for x, y, (and z).
        """
        return self.pixel_size

    def get_position_um(self, well_id=None):
        """
        Returns the position in micrometers.

        Returns:
            dict: Position in micrometers.
        """
        return self.position

    def get_channels(self):
        """
        Returns channel metadata.

        Returns:
            list: List of channel dicts.
        """
        return self.channels

    def get_nchannels(self):
        """
        Returns the number of channels.

        Returns:
            int: Number of channels.
        """
        nchannels = 1
        if 'c' in self.dim_order:
            c_index = self.dim_order.index('c')
            nchannels = self.tiff.pages.first.shape[c_index]
        return nchannels

    def get_rows(self):
        """
        Returns the list of row identifiers.

        Returns:
            list: Row identifiers.
        """
        return self.rows

    def get_columns(self):
        """
        Returns the list of column identifiers.

        Returns:
            list: Column identifiers.
        """
        return self.columns

    def get_wells(self):
        """
        Returns the list of well identifiers.

        Returns:
            list: Well identifiers.
        """
        return self.wells

    def get_time_points(self):
        """
        Returns the list of time points.

        Returns:
            list: Time point IDs.
        """
        nt = 1
        if 't' in self.dim_order:
            t_index = self.dim_order.index('t')
            nt = self.tiff.pages.first.shape[t_index]
        return list(range(nt))

    def get_fields(self):
        """
        Returns the list of field indices.

        Returns:
            list: Field indices.
        """
        return self.fields

    def get_acquisitions(self):
        """
        Returns acquisition metadata (empty for TIFF).

        Returns:
            list: Empty list.
        """
        return []

    def get_total_data_size(self):
        """
        Returns the estimated total data size.

        Returns:
            int: Total data size in bytes.
        """
        total_size = np.prod(self.shape)
        if self.is_plate:
            total_size *= len(self.get_wells()) * len(self.get_fields())
        return total_size

    def close(self):
        """
        Closes the TIFF file.
        """
        self.tiff.close()

metadata = metadata_to_dict(file.read().decode()) instance-attribute

tiff = TiffFile(image_filename) instance-attribute

__init__(uri, metadata={})

Initialize TiffSource.

Parameters:

Name Type Description Default
uri str

Path to the TIFF file.

required
metadata dict

Optional metadata dictionary.

{}
Source code in src\TiffSource.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, uri, metadata={}):
    """
    Initialize TiffSource.

    Args:
        uri (str): Path to the TIFF file.
        metadata (dict): Optional metadata dictionary.
    """
    super().__init__(uri, metadata)
    image_filename = None
    ext = os.path.splitext(uri)[1].lower()
    if 'tif' in ext:
        image_filename = uri
    elif 'ome' in ext:
        # read metadata
        with open(uri, 'rb') as file:
            self.metadata = metadata_to_dict(file.read().decode())
        # try to open first ome-tiff file
        filename = ensure_list(self.metadata.get('Image', {}))[0].get('Pixels', {}).get('TiffData', {}).get('UUID', {}).get('FileName')
        if filename:
            image_filename = os.path.join(os.path.dirname(uri), filename)
    else:
        raise RuntimeError(f'Unsupported tiff extension: {ext}')

    self.tiff = TiffFile(image_filename)

close()

Closes the TIFF file.

Source code in src\TiffSource.py
311
312
313
314
315
def close(self):
    """
    Closes the TIFF file.
    """
    self.tiff.close()

get_acquisitions()

Returns acquisition metadata (empty for TIFF).

Returns:

Name Type Description
list

Empty list.

Source code in src\TiffSource.py
290
291
292
293
294
295
296
297
def get_acquisitions(self):
    """
    Returns acquisition metadata (empty for TIFF).

    Returns:
        list: Empty list.
    """
    return []

get_channels()

Returns channel metadata.

Returns:

Name Type Description
list

List of channel dicts.

Source code in src\TiffSource.py
219
220
221
222
223
224
225
226
def get_channels(self):
    """
    Returns channel metadata.

    Returns:
        list: List of channel dicts.
    """
    return self.channels

get_columns()

Returns the list of column identifiers.

Returns:

Name Type Description
list

Column identifiers.

Source code in src\TiffSource.py
250
251
252
253
254
255
256
257
def get_columns(self):
    """
    Returns the list of column identifiers.

    Returns:
        list: Column identifiers.
    """
    return self.columns

get_data(well_id=None, field_id=None)

Gets image data from the TIFF file.

Returns:

Name Type Description
ndarray

Image data.

Source code in src\TiffSource.py
162
163
164
165
166
167
168
169
170
171
172
def get_data(self, well_id=None, field_id=None):
    """
    Gets image data from the TIFF file.

    Returns:
        ndarray: Image data.
    """
    data = self.tiff.asarray()
    while data.ndim < len(self.dim_order):
        data = np.expand_dims(data, 0)
    return data

get_dim_order()

Returns the dimension order string.

Returns:

Name Type Description
str

Dimension order.

Source code in src\TiffSource.py
183
184
185
186
187
188
189
190
def get_dim_order(self):
    """
    Returns the dimension order string.

    Returns:
        str: Dimension order.
    """
    return self.dim_order

get_dtype()

Returns the numpy dtype of the image data.

Returns:

Name Type Description
dtype

Numpy dtype.

Source code in src\TiffSource.py
192
193
194
195
196
197
198
199
def get_dtype(self):
    """
    Returns the numpy dtype of the image data.

    Returns:
        dtype: Numpy dtype.
    """
    return self.dtype

get_fields()

Returns the list of field indices.

Returns:

Name Type Description
list

Field indices.

Source code in src\TiffSource.py
281
282
283
284
285
286
287
288
def get_fields(self):
    """
    Returns the list of field indices.

    Returns:
        list: Field indices.
    """
    return self.fields

get_name()

Gets the image or plate name.

Returns:

Name Type Description
str

Name.

Source code in src\TiffSource.py
174
175
176
177
178
179
180
181
def get_name(self):
    """
    Gets the image or plate name.

    Returns:
        str: Name.
    """
    return self.name

get_nchannels()

Returns the number of channels.

Returns:

Name Type Description
int

Number of channels.

Source code in src\TiffSource.py
228
229
230
231
232
233
234
235
236
237
238
239
def get_nchannels(self):
    """
    Returns the number of channels.

    Returns:
        int: Number of channels.
    """
    nchannels = 1
    if 'c' in self.dim_order:
        c_index = self.dim_order.index('c')
        nchannels = self.tiff.pages.first.shape[c_index]
    return nchannels

get_pixel_size_um()

Returns the pixel size in micrometers.

Returns:

Name Type Description
dict

Pixel size for x, y, (and z).

Source code in src\TiffSource.py
201
202
203
204
205
206
207
208
def get_pixel_size_um(self):
    """
    Returns the pixel size in micrometers.

    Returns:
        dict: Pixel size for x, y, (and z).
    """
    return self.pixel_size

get_position_um(well_id=None)

Returns the position in micrometers.

Returns:

Name Type Description
dict

Position in micrometers.

Source code in src\TiffSource.py
210
211
212
213
214
215
216
217
def get_position_um(self, well_id=None):
    """
    Returns the position in micrometers.

    Returns:
        dict: Position in micrometers.
    """
    return self.position

get_rows()

Returns the list of row identifiers.

Returns:

Name Type Description
list

Row identifiers.

Source code in src\TiffSource.py
241
242
243
244
245
246
247
248
def get_rows(self):
    """
    Returns the list of row identifiers.

    Returns:
        list: Row identifiers.
    """
    return self.rows

get_shape()

Returns the shape of the image data.

Returns:

Name Type Description
tuple

Shape of the image data.

Source code in src\TiffSource.py
153
154
155
156
157
158
159
160
def get_shape(self):
    """
    Returns the shape of the image data.

    Returns:
        tuple: Shape of the image data.
    """
    return self.shape

get_time_points()

Returns the list of time points.

Returns:

Name Type Description
list

Time point IDs.

Source code in src\TiffSource.py
268
269
270
271
272
273
274
275
276
277
278
279
def get_time_points(self):
    """
    Returns the list of time points.

    Returns:
        list: Time point IDs.
    """
    nt = 1
    if 't' in self.dim_order:
        t_index = self.dim_order.index('t')
        nt = self.tiff.pages.first.shape[t_index]
    return list(range(nt))

get_total_data_size()

Returns the estimated total data size.

Returns:

Name Type Description
int

Total data size in bytes.

Source code in src\TiffSource.py
299
300
301
302
303
304
305
306
307
308
309
def get_total_data_size(self):
    """
    Returns the estimated total data size.

    Returns:
        int: Total data size in bytes.
    """
    total_size = np.prod(self.shape)
    if self.is_plate:
        total_size *= len(self.get_wells()) * len(self.get_fields())
    return total_size

get_wells()

Returns the list of well identifiers.

Returns:

Name Type Description
list

Well identifiers.

Source code in src\TiffSource.py
259
260
261
262
263
264
265
266
def get_wells(self):
    """
    Returns the list of well identifiers.

    Returns:
        list: Well identifiers.
    """
    return self.wells

init_metadata()

Initializes and loads metadata from the (OME) TIFF file.

Returns:

Name Type Description
dict

Metadata dictionary.

Source code in src\TiffSource.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def init_metadata(self):
    """
    Initializes and loads metadata from the (OME) TIFF file.

    Returns:
        dict: Metadata dictionary.
    """
    self.is_ome = self.tiff.is_ome
    self.is_imagej = self.tiff.is_imagej
    pixel_size = {'x': 1, 'y': 1}
    position = {}
    channels = []
    if self.is_ome:
        metadata = metadata_to_dict(self.tiff.ome_metadata)
        if metadata and not 'BinaryOnly' in metadata:
            self.metadata = metadata
        self.is_plate = 'Plate' in self.metadata
        if self.is_plate:
            plate = self.metadata['Plate']
            self.name = plate.get('Name')
            rows = set()
            columns = set()
            wells = {}
            for well in plate['Well']:
                row = create_col_row_label(well['Row'], plate['RowNamingConvention'])
                column = create_col_row_label(well['Column'], plate['ColumnNamingConvention'])
                rows.add(row)
                columns.add(column)
                label = f'{row}{column}'
                wells[label] = well['ID']
            self.rows = sorted(rows)
            self.columns = list(columns)
            self.wells = list(wells.keys())
        else:
            self.name = self.metadata['Image'].get('Name')
        pixels = ensure_list(self.metadata.get('Image', []))[0].get('Pixels', {})
        self.shape = pixels.get('SizeT'), pixels.get('SizeC'), pixels.get('SizeZ'), pixels.get('SizeY'), pixels.get('SizeX')
        self.dim_order = ''.join(reversed(pixels['DimensionOrder'].lower()))
        self.dtype = np.dtype(pixels['Type'])
        if 'PhysicalSizeX' in pixels:
            pixel_size['x'] = convert_to_um(float(pixels.get('PhysicalSizeX')), pixels.get('PhysicalSizeXUnit'))
        if 'PhysicalSizeY' in pixels:
            pixel_size['y'] = convert_to_um(float(pixels.get('PhysicalSizeY')), pixels.get('PhysicalSizeYUnit'))
        if 'PhysicalSizeZ' in pixels:
            pixel_size['z'] = convert_to_um(float(pixels.get('PhysicalSizeZ')), pixels.get('PhysicalSizeZUnit'))
        plane = pixels.get('Plane')
        if plane:
            if 'PositionX' in plane:
                position['x'] = convert_to_um(float(plane.get('PositionX')), plane.get('PositionXUnit'))
            if 'PositionY' in plane:
                position['y'] = convert_to_um(float(plane.get('PositionY')), plane.get('PositionYUnit'))
            if 'PositionZ' in plane:
                position['z'] = convert_to_um(float(plane.get('PositionZ')), plane.get('PositionZUnit'))
        for channel0 in ensure_list(pixels.get('Channel')):
            channel = {}
            if 'Name' in channel0:
                channel['label'] = channel0['Name']
            if 'Color' in channel0:
                channel['color'] = int_to_rgba(channel0['Color'])
            channels.append(channel)
    else:
        self.is_plate = False
        if self.is_imagej:
            self.imagej_metadata = self.tiff.imagej_metadata
            pixel_size_unit = self.imagej_metadata.get('unit', '').encode().decode('unicode_escape')
            if 'scales' in self.imagej_metadata:
                for dim, scale in zip(['x', 'y'], self.imagej_metadata['scales'].split(',')):
                    scale = scale.strip()
                    if scale != '':
                        pixel_size[dim] = convert_to_um(float(scale), pixel_size_unit)
            if 'spacing' in self.imagej_metadata:
                pixel_size['z'] = convert_to_um(self.imagej_metadata['spacing'], pixel_size_unit)
        self.metadata = tags_to_dict(self.tiff.pages.first.tags)
        self.name = os.path.splitext(self.tiff.filename)[0]
        if self.tiff.series:
            page = self.tiff.series[0]
        else:
            page = self.tiff.pages.first
        self.shape = page.shape
        while len(self.shape) < 5:
            self.shape = tuple([1] + list(self.shape))
        self.dim_order = page.axes.lower().replace('s', 'c').replace('r', '')
        self.dtype = page.dtype
        res_unit = self.metadata.get('ResolutionUnit', '')
        if isinstance(res_unit, Enum):
            res_unit = res_unit.name
        res_unit = res_unit.lower()
        if res_unit == 'none':
            res_unit = ''
        if 'x' not in pixel_size:
            res0 = convert_rational_value(self.metadata.get('XResolution'))
            if res0 is not None and res0 != 0:
                pixel_size['x'] = convert_to_um(1 / res0, res_unit)
        if 'y' not in pixel_size:
            res0 = convert_rational_value(self.metadata.get('YResolution'))
            if res0 is not None and res0 != 0:
                pixel_size['y'] = convert_to_um(1 / res0, res_unit)
    self.pixel_size = pixel_size
    self.position = position
    self.channels = channels
    return self.metadata

is_screen()

Checks if the source is a plate/screen.

Returns:

Name Type Description
bool

True if plate/screen.

Source code in src\TiffSource.py
144
145
146
147
148
149
150
151
def is_screen(self):
    """
    Checks if the source is a plate/screen.

    Returns:
        bool: True if plate/screen.
    """
    return self.is_plate

convert_rational_value(value)

Converts a rational value tuple to a float.

Parameters:

Name Type Description Default
value tuple or None

Rational value.

required

Returns:

Type Description

float or None: Converted value.

Source code in src\TiffSource.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def convert_rational_value(value):
    """
    Converts a rational value tuple to a float.

    Args:
        value (tuple or None): Rational value.

    Returns:
        float or None: Converted value.
    """
    if value is not None and isinstance(value, tuple):
        if value[0] == value[1]:
            value = value[0]
        else:
            value = value[0] / value[1]
    return value

tags_to_dict(tags)

Converts TIFF tags to a dictionary.

Parameters:

Name Type Description Default
tags

TIFF tags object.

required

Returns:

Name Type Description
dict

Tag name-value mapping.

Source code in src\TiffSource.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def tags_to_dict(tags):
    """
    Converts TIFF tags to a dictionary.

    Args:
        tags: TIFF tags object.

    Returns:
        dict: Tag name-value mapping.
    """
    tag_dict = {}
    for tag in tags.values():
        tag_dict[tag.name] = tag.value
    return tag_dict

Timer

Timer

Bases: object

Context manager for timing code execution and logging the elapsed time.

Source code in src\Timer.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Timer(object):
    """
    Context manager for timing code execution and logging the elapsed time.
    """

    def __init__(self, title, auto_unit=True, verbose=True):
        """
        Initialize the Timer.

        Args:
            title (str): Description for the timed block.
            auto_unit (bool): Automatically select time unit (seconds/minutes/hours).
            verbose (bool): If True, log the elapsed time.
        """
        self.title = title
        self.auto_unit = auto_unit
        self.verbose = verbose

    def __enter__(self):
        """
        Start timing.
        """
        self.ptime_start = time.process_time()
        self.time_start = time.time()

    def __exit__(self, type, value, traceback):
        """
        Stop timing and log the elapsed time.

        Args:
            type: Exception type, if any.
            value: Exception value, if any.
            traceback: Exception traceback, if any.
        """
        if self.verbose:
            ptime_end = time.process_time()
            time_end = time.time()
            pelapsed = ptime_end - self.ptime_start
            elapsed = time_end - self.time_start
            unit = 'seconds'
            if self.auto_unit and elapsed >= 60:
                pelapsed /= 60
                elapsed /= 60
                unit = 'minutes'
                if elapsed >= 60:
                    pelapsed /= 60
                    elapsed /= 60
                    unit = 'hours'
            logging.info(f'Time {self.title}: {elapsed:.1f} ({pelapsed:.1f}) {unit}')

auto_unit = auto_unit instance-attribute

title = title instance-attribute

verbose = verbose instance-attribute

__enter__()

Start timing.

Source code in src\Timer.py
25
26
27
28
29
30
def __enter__(self):
    """
    Start timing.
    """
    self.ptime_start = time.process_time()
    self.time_start = time.time()

__exit__(type, value, traceback)

Stop timing and log the elapsed time.

Parameters:

Name Type Description Default
type

Exception type, if any.

required
value

Exception value, if any.

required
traceback

Exception traceback, if any.

required
Source code in src\Timer.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __exit__(self, type, value, traceback):
    """
    Stop timing and log the elapsed time.

    Args:
        type: Exception type, if any.
        value: Exception value, if any.
        traceback: Exception traceback, if any.
    """
    if self.verbose:
        ptime_end = time.process_time()
        time_end = time.time()
        pelapsed = ptime_end - self.ptime_start
        elapsed = time_end - self.time_start
        unit = 'seconds'
        if self.auto_unit and elapsed >= 60:
            pelapsed /= 60
            elapsed /= 60
            unit = 'minutes'
            if elapsed >= 60:
                pelapsed /= 60
                elapsed /= 60
                unit = 'hours'
        logging.info(f'Time {self.title}: {elapsed:.1f} ({pelapsed:.1f}) {unit}')

__init__(title, auto_unit=True, verbose=True)

Initialize the Timer.

Parameters:

Name Type Description Default
title str

Description for the timed block.

required
auto_unit bool

Automatically select time unit (seconds/minutes/hours).

True
verbose bool

If True, log the elapsed time.

True
Source code in src\Timer.py
12
13
14
15
16
17
18
19
20
21
22
23
def __init__(self, title, auto_unit=True, verbose=True):
    """
    Initialize the Timer.

    Args:
        title (str): Description for the timed block.
        auto_unit (bool): Automatically select time unit (seconds/minutes/hours).
        verbose (bool): If True, log the elapsed time.
    """
    self.title = title
    self.auto_unit = auto_unit
    self.verbose = verbose

WindowScanner

WindowScanner

Computes quantile-based min/max window for image channels.

Source code in src\WindowScanner.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class WindowScanner:
    """
    Computes quantile-based min/max window for image channels.
    """

    def __init__(self):
        """
        Initialize WindowScanner.
        """
        self.min = {}
        self.max = {}

    def process(self, data, dim_order, min_quantile=0.01, max_quantile=0.99):
        """
        Processes image data to compute min/max quantiles for each channel.

        Args:
            data (ndarray): Image data.
            dim_order (str): Dimension order string.
            min_quantile (float): Lower quantile.
            max_quantile (float): Upper quantile.
        """
        if 'c' in dim_order:
            nc = data.shape[dim_order.index('c')]
        else:
            nc = 1
        for channeli in range(nc):
            if 'c' in dim_order:
                channel_data = data.take(channeli, axis=dim_order.index('c'))
            else:
                channel_data = data
            min1, max1 = np.quantile(channel_data, q=[min_quantile, max_quantile])
            if data.dtype.kind in ['u', 'i']:
                min1, max1 = int(min1), int(max1)
            if channeli not in self.min:
                self.min[channeli] = min1
            else:
                self.min[channeli] = min(min1, self.min[channeli])
            if channeli not in self.max:
                self.max[channeli] = max1
            else:
                self.max[channeli] = max(max1, self.max[channeli])

    def get_window(self):
        """
        Returns the computed min/max window for channels.

        Returns:
            tuple: (min dict, max dict)
        """
        return self.min, self.max

max = {} instance-attribute

min = {} instance-attribute

__init__()

Initialize WindowScanner.

Source code in src\WindowScanner.py
 9
10
11
12
13
14
def __init__(self):
    """
    Initialize WindowScanner.
    """
    self.min = {}
    self.max = {}

get_window()

Returns the computed min/max window for channels.

Returns:

Name Type Description
tuple

(min dict, max dict)

Source code in src\WindowScanner.py
47
48
49
50
51
52
53
54
def get_window(self):
    """
    Returns the computed min/max window for channels.

    Returns:
        tuple: (min dict, max dict)
    """
    return self.min, self.max

process(data, dim_order, min_quantile=0.01, max_quantile=0.99)

Processes image data to compute min/max quantiles for each channel.

Parameters:

Name Type Description Default
data ndarray

Image data.

required
dim_order str

Dimension order string.

required
min_quantile float

Lower quantile.

0.01
max_quantile float

Upper quantile.

0.99
Source code in src\WindowScanner.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def process(self, data, dim_order, min_quantile=0.01, max_quantile=0.99):
    """
    Processes image data to compute min/max quantiles for each channel.

    Args:
        data (ndarray): Image data.
        dim_order (str): Dimension order string.
        min_quantile (float): Lower quantile.
        max_quantile (float): Upper quantile.
    """
    if 'c' in dim_order:
        nc = data.shape[dim_order.index('c')]
    else:
        nc = 1
    for channeli in range(nc):
        if 'c' in dim_order:
            channel_data = data.take(channeli, axis=dim_order.index('c'))
        else:
            channel_data = data
        min1, max1 = np.quantile(channel_data, q=[min_quantile, max_quantile])
        if data.dtype.kind in ['u', 'i']:
            min1, max1 = int(min1), int(max1)
        if channeli not in self.min:
            self.min[channeli] = min1
        else:
            self.min[channeli] = min(min1, self.min[channeli])
        if channeli not in self.max:
            self.max[channeli] = max1
        else:
            self.max[channeli] = max(max1, self.max[channeli])

color_conversion

hexrgb_to_rgba(hexrgb)

Source code in src\color_conversion.py
21
22
23
24
25
26
def hexrgb_to_rgba(hexrgb: str) -> list:
    hexrgb = hexrgb.lstrip('#')
    if len(hexrgb) == 6:
        hexrgb += 'FF'  # add alpha
    rgba = int_to_rgba(eval('0x' + hexrgb))
    return rgba

int_to_rgba(intrgba)

Source code in src\color_conversion.py
3
4
5
6
7
8
def int_to_rgba(intrgba: int) -> list:
    signed = (intrgba < 0)
    rgba = [x / 255 for x in intrgba.to_bytes(4, signed=signed, byteorder="big")]
    if rgba[-1] == 0:
        rgba[-1] = 1
    return rgba

rgba_to_hexrgb(rgba)

Source code in src\color_conversion.py
16
17
18
def rgba_to_hexrgb(rgba: list) -> str:
    hexrgb = ''.join([hex(int(x * 255))[2:].upper().zfill(2) for x in rgba[:3]])
    return hexrgb

rgba_to_int(rgba)

Source code in src\color_conversion.py
11
12
13
def rgba_to_int(rgba: list) -> int:
    intrgba = int.from_bytes([int(x * 255) for x in rgba], signed=True, byteorder="big")
    return intrgba

helper

create_source(filename)

Create an image source object based on the input file extension.

Parameters:

Name Type Description Default
filename str

Path to the input file.

required

Returns:

Name Type Description
ImageSource

Source object for the input file.

Raises:

Type Description
ValueError

If the file format is unsupported.

Source code in src\helper.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def create_source(filename):
    """
    Create an image source object based on the input file extension.

    Args:
        filename (str): Path to the input file.

    Returns:
        ImageSource: Source object for the input file.

    Raises:
        ValueError: If the file format is unsupported.
    """
    input_ext = os.path.splitext(filename)[1].lower()

    if input_ext == '.db':
        from src.ImageDbSource import ImageDbSource
        source = ImageDbSource(filename)
    elif input_ext == '.isyntax':
        from src.ISyntaxSource import ISyntaxSource
        source = ISyntaxSource(filename)
    elif 'tif' in input_ext:
        from src.TiffSource import TiffSource
        source = TiffSource(filename)
    else:
        raise ValueError(f'Unsupported input file format: {input_ext}')
    return source

create_writer(output_format, verbose=False)

Create a writer object and output extension based on the output format.

Parameters:

Name Type Description Default
output_format str

Output format string.

required
verbose bool

If True, enables verbose output.

False

Returns:

Name Type Description
tuple

(writer object, output file extension)

Raises:

Type Description
ValueError

If the output format is unsupported.

Source code in src\helper.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def create_writer(output_format, verbose=False):
    """
    Create a writer object and output extension based on the output format.

    Args:
        output_format (str): Output format string.
        verbose (bool): If True, enables verbose output.

    Returns:
        tuple: (writer object, output file extension)

    Raises:
        ValueError: If the output format is unsupported.
    """
    if 'zar' in output_format:
        if '3' in output_format:
            zarr_version = 3
            ome_version = '0.5'
        else:
            zarr_version = 2
            ome_version = '0.4'
        from src.OmeZarrWriter import OmeZarrWriter
        writer = OmeZarrWriter(zarr_version=zarr_version, ome_version=ome_version, verbose=verbose)
        ext = '.ome.zarr'
    elif 'tif' in output_format:
        from src.OmeTiffWriter import OmeTiffWriter
        writer = OmeTiffWriter(verbose=verbose)
        ext = '.ome.tiff'
    else:
        raise ValueError(f'Unsupported output format: {output_format}')
    return writer, ext

ome_tiff_util

create_binaryonly_metadata(metadata_filename, companion_uuid)

Source code in src\ome_tiff_util.py
116
117
118
119
120
121
def create_binaryonly_metadata(metadata_filename, companion_uuid):
    ome = OME()
    ome.uuid = create_uuid()
    ome.creator = f'nl.biomero.OmeTiffWriter {VERSION}'
    ome.binary_only = OME.BinaryOnly(metadata_file=metadata_filename, uuid=companion_uuid)
    return to_xml(ome), ome.uuid

create_col_row_label(index, naming_convention)

Source code in src\ome_tiff_util.py
133
134
135
136
137
def create_col_row_label(index, naming_convention):
    label = index + 1
    if naming_convention.lower() == NamingConvention.LETTER.name.lower():
        label = chr(ord('A') + index)
    return str(label)

create_image_metadata(source, image_uuid=None, image_filename=None)

Source code in src\ome_tiff_util.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def create_image_metadata(source, image_uuid=None, image_filename=None):
    t, c, z, y, x = source.get_shape()
    pixel_size = source.get_pixel_size_um()
    ome_channels = []
    for channeli, channel in enumerate(source.get_channels()):
        ome_channel = Channel()
        ome_channel.name = channel.get('label', channel.get('Name', f'{channeli}'))
        ome_channel.samples_per_pixel = 1
        color = channel.get('color', channel.get('Color'))
        if color is not None:
            ome_channel.color = Color(rgba_to_int(color))
        ome_channels.append(ome_channel)

    tiff_data = TiffData()
    tiff_data.uuid = TiffData.UUID(value=image_uuid, file_name=image_filename)

    pixels = Pixels(
        dimension_order=source.get_dim_order()[::-1].upper(),
        type=PixelType(str(source.get_dtype())),
        channels=ome_channels,
        size_t=t, size_c=c, size_z=z, size_y=y, size_x=x,
        tiff_data_blocks=[tiff_data]
    )
    if 'x' in pixel_size:
        pixels.physical_size_x = pixel_size['x']
        pixels.physical_size_x_unit = UnitsLength.MICROMETER
    if 'y' in pixel_size:
        pixels.physical_size_y = pixel_size['y']
        pixels.physical_size_y_unit = UnitsLength.MICROMETER
    if 'z' in pixel_size:
        pixels.physical_size_z = pixel_size['z']
        pixels.physical_size_z_unit = UnitsLength.MICROMETER

    image = Image(pixels=pixels)
    index = pixels.id.split(':')[1]
    for channeli, channel in enumerate(pixels.channels):
        channel.id = f'Channel:{index}:{channeli}'
    return image

create_metadata(source, companion_uuid=None, image_uuids=None, image_filenames=None)

Source code in src\ome_tiff_util.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def create_metadata(source, companion_uuid=None, image_uuids=None, image_filenames=None):
    ome = OME()
    ome.uuid = companion_uuid
    ome.creator = f'nl.biomero.OmeTiffWriter {VERSION}'

    if source.is_screen:
        columns = source.get_columns()
        rows = source.get_rows()

        plate = Plate()
        plate.columns = len(columns)
        plate.rows = len(rows)
        plate.row_naming_convention = get_col_row_type(rows)
        plate.column_naming_convention = get_col_row_type(columns)

        image_index = 0
        for well_id in source.get_wells():
            row, col = split_well_name(well_id)
            col_index = columns.index(col)
            row_index = rows.index(row)
            well = Well(column=col_index, row=row_index)
            well.id = f'Well:{well_id}'
            for field in source.get_fields():
                sample = WellSample(index=image_index)
                sample.id = f'WellSample:{well_id}:{field}'
                position = source.get_position_um(well_id)
                if 'x' in position:
                    sample.position_x = position['x']
                    sample.position_x_unit = UnitsLength.MICROMETER
                if 'y' in position:
                    sample.position_y = position['y']
                    sample.position_y_unit = UnitsLength.MICROMETER

                image = create_image_metadata(source,
                                              image_uuids[image_index],
                                              image_filenames[image_index])
                ome.images.append(image)

                image_ref = ImageRef(id=image.id)   # assign id at instantiation to avoid auto sequence increment
                sample.image_ref = image_ref
                well.well_samples.append(sample)

                image_index += 1

            plate.wells.append(well)

        ome.plates = [plate]
    else:
        ome.images = [create_image_metadata(source, ome.uuid, source.get_name())]

    return to_xml(ome)

create_resolution_metadata(source)

Source code in src\ome_tiff_util.py
140
141
142
143
144
def create_resolution_metadata(source):
    pixel_size_um = source.get_pixel_size_um()
    resolution_unit = 'CENTIMETER'
    resolution = [1e4 / pixel_size_um[dim] for dim in 'xy']
    return resolution, resolution_unit

create_uuid()

Source code in src\ome_tiff_util.py
19
20
def create_uuid():
    return f'urn:uuid:{uuid.uuid4()}'

get_col_row_type(labels)

Source code in src\ome_tiff_util.py
124
125
126
127
128
129
130
def get_col_row_type(labels):
    is_digits = [label.isdigit() for label in labels]
    if np.all(is_digits):
        naming_convention = NamingConvention.NUMBER
    else:
        naming_convention = NamingConvention.LETTER
    return naming_convention

metadata_to_dict(xml_metadata)

Source code in src\ome_tiff_util.py
12
13
14
15
16
def metadata_to_dict(xml_metadata):
    metadata = xml2dict(xml_metadata)
    if 'OME' in metadata:
        metadata = metadata['OME']
    return metadata

ome_zarr_util

create_axes_metadata(dimension_order)

Create axes metadata for OME-Zarr from dimension order.

Parameters:

Name Type Description Default
dimension_order str

String of dimension characters.

required

Returns:

Name Type Description
list

List of axis metadata dictionaries.

Source code in src\ome_zarr_util.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def create_axes_metadata(dimension_order):
    """
    Create axes metadata for OME-Zarr from dimension order.

    Args:
        dimension_order (str): String of dimension characters.

    Returns:
        list: List of axis metadata dictionaries.
    """
    axes = []
    for dimension in dimension_order:
        unit1 = None
        if dimension == 't':
            type1 = 'time'
            unit1 = 'millisecond'
        elif dimension == 'c':
            type1 = 'channel'
        else:
            type1 = 'space'
            unit1 = 'micrometer'
        axis = {'name': dimension, 'type': type1}
        if unit1 is not None and unit1 != '':
            axis['unit'] = unit1
        axes.append(axis)
    return axes

create_channel_metadata(dtype, channels, nchannels, window, ome_version)

Create channel metadata for OME-Zarr.

Parameters:

Name Type Description Default
dtype

Numpy dtype of image data.

required
channels list

List of channel dicts.

required
nchannels int

Number of channels.

required
window tuple

Min/max window values.

required
ome_version str

OME-Zarr version.

required

Returns:

Name Type Description
dict

Channel metadata dictionary.

Source code in src\ome_zarr_util.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def create_channel_metadata(dtype, channels, nchannels, window, ome_version):
    """
    Create channel metadata for OME-Zarr.

    Args:
        dtype: Numpy dtype of image data.
        channels (list): List of channel dicts.
        nchannels (int): Number of channels.
        window (tuple): Min/max window values.
        ome_version (str): OME-Zarr version.

    Returns:
        dict: Channel metadata dictionary.
    """
    if len(channels) < nchannels:
        labels = []
        colors = []
        if nchannels in (3, 4):
            labels = ['Red', 'Green', 'Blue']
            colors = ["FF0000", "00FF00", "0000FF"]
        if nchannels == 4:
            labels += ['Alpha']
            colors += ["FFFFFF"]
        channels = [{'label': label, 'color': color} for label, color in zip(labels, colors)]

    omezarr_channels = []
    start, end = window
    for channeli, channel in enumerate(channels):
        omezarr_channel = {'label': channel.get('label', channel.get('Name', f'{channeli}')), 'active': True}
        color = channel.get('color', channel.get('Color'))
        if color is not None:
            omezarr_channel['color'] = rgba_to_hexrgb(color)
        if dtype.kind == 'f':
            min, max = 0, 1
        else:
            info = np.iinfo(dtype)
            min, max = info.min, info.max
        omezarr_channel['window'] = {'min': min, 'max': max, 'start': start[channeli], 'end': end[channeli]}
        omezarr_channels.append(omezarr_channel)

    metadata = {
        'version': ome_version,
        'channels': omezarr_channels,
    }
    return metadata

create_transformation_metadata(dimension_order, pixel_size_um, scale, translation_um={})

Create transformation metadata (scale and translation) for OME-Zarr.

Parameters:

Name Type Description Default
dimension_order str

String of dimension characters.

required
pixel_size_um dict

Pixel size in micrometers per dimension.

required
scale float

Scaling factor.

required
translation_um dict

Translation in micrometers per dimension.

{}

Returns:

Name Type Description
list

List of transformation metadata dictionaries.

Source code in src\ome_zarr_util.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def create_transformation_metadata(dimension_order, pixel_size_um, scale, translation_um={}):
    """
    Create transformation metadata (scale and translation) for OME-Zarr.

    Args:
        dimension_order (str): String of dimension characters.
        pixel_size_um (dict): Pixel size in micrometers per dimension.
        scale (float): Scaling factor.
        translation_um (dict, optional): Translation in micrometers per dimension.

    Returns:
        list: List of transformation metadata dictionaries.
    """
    metadata = []
    pixel_size_scale = []
    translation_scale = []
    for dimension in dimension_order:
        pixel_size_scale1 = pixel_size_um.get(dimension, 1)
        if pixel_size_scale1 == 0:
            pixel_size_scale1 = 1
        if dimension in ['x', 'y']:
            pixel_size_scale1 /= scale
        pixel_size_scale.append(pixel_size_scale1)

        translation1 = translation_um.get(dimension, 0)
        if dimension in ['x', 'y']:
            translation1 *= scale
        translation_scale.append(translation1)

    metadata.append({'type': 'scale', 'scale': pixel_size_scale})
    if not all(v == 0 for v in translation_scale):
        metadata.append({'type': 'translation', 'translation': translation_scale})
    return metadata

scale_dimensions_dict(shape0, scale)

Scale x and y dimensions in a shape dictionary.

Parameters:

Name Type Description Default
shape0 dict

Original shape dictionary.

required
scale float

Scaling factor.

required

Returns:

Name Type Description
dict

Scaled shape dictionary.

Source code in src\ome_zarr_util.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def scale_dimensions_dict(shape0, scale):
    """
    Scale x and y dimensions in a shape dictionary.

    Args:
        shape0 (dict): Original shape dictionary.
        scale (float): Scaling factor.

    Returns:
        dict: Scaled shape dictionary.
    """
    shape = {}
    if scale == 1:
        return shape0
    for dimension, shape1 in shape0.items():
        if dimension[0] in ['x', 'y']:
            shape1 = int(shape1 * scale)
        shape[dimension] = shape1
    return shape

scale_dimensions_xy(shape0, dimension_order, scale)

Scale x and y dimensions in a shape tuple.

Parameters:

Name Type Description Default
shape0 tuple

Original shape.

required
dimension_order str

String of dimension characters.

required
scale float

Scaling factor.

required

Returns:

Name Type Description
list

Scaled shape.

Source code in src\ome_zarr_util.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def scale_dimensions_xy(shape0, dimension_order, scale):
    """
    Scale x and y dimensions in a shape tuple.

    Args:
        shape0 (tuple): Original shape.
        dimension_order (str): String of dimension characters.
        scale (float): Scaling factor.

    Returns:
        list: Scaled shape.
    """
    shape = []
    if scale == 1:
        return shape0
    for shape1, dimension in zip(shape0, dimension_order):
        if dimension[0] in ['x', 'y']:
            shape1 = int(shape1 * scale)
        shape.append(shape1)
    return shape

parameters

VERSION = '0.0.15' module-attribute

util

convert_dotnet_ticks_to_datetime(net_ticks)

Source code in src\util.py
66
67
def convert_dotnet_ticks_to_datetime(net_ticks):
    return datetime(1, 1, 1) + timedelta(microseconds=net_ticks // 10)

convert_to_um(value, unit)

Source code in src\util.py
113
114
115
116
117
118
119
120
121
def convert_to_um(value, unit):
    conversions = {
        'nm': 1e-3,
        'µm': 1, 'um': 1, 'micrometer': 1, 'micron': 1,
        'mm': 1e3, 'millimeter': 1e3,
        'cm': 1e4, 'centimeter': 1e4,
        'm': 1e6, 'meter': 1e6
    }
    return value * conversions.get(unit, 1)

ensure_list(item)

Source code in src\util.py
6
7
8
9
def ensure_list(item):
    if not isinstance(item, (list, tuple)):
        item = [item]
    return item

get_filetitle(filename)

Source code in src\util.py
12
13
def get_filetitle(filename):
    return os.path.basename(os.path.splitext(filename)[0])

pad_leading_zero(input_string, num_digits=2)

Source code in src\util.py
48
49
50
51
52
53
54
55
56
57
58
def pad_leading_zero(input_string, num_digits=2):
    output = str(input_string)
    is_well = not output[0].isdigit()
    if is_well:
        row, col = split_well_name(output, remove_leading_zeros=True)
        output = str(col)
    while len(output) < num_digits:
        output = '0' + output
    if is_well:
        output = row + output
    return output

print_dict(value, tab=0, max_len=250, bullet=False)

Source code in src\util.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def print_dict(value, tab=0, max_len=250, bullet=False):
    s = ''
    if isinstance(value, dict):
        for key, subvalue in value.items():
            s += '\n'
            if bullet:
                s += '-'
                bullet = False
            s += '\t' * tab + str(key) + ': '
            if isinstance(subvalue, dict):
                s += print_dict(subvalue, tab+1)
            elif isinstance(subvalue, list):
                for v in subvalue:
                    s += print_dict(v, tab+1, bullet=True)
            else:
                subvalue = str(subvalue)
                if len(subvalue) > max_len:
                    subvalue = subvalue[:max_len] + '...'
                s += subvalue
    else:
        s += str(value) + ' '
    return s

print_hbytes(nbytes)

Source code in src\util.py
148
149
150
151
152
153
154
155
156
157
158
159
def print_hbytes(nbytes):
    exps = ['', 'K', 'M', 'G', 'T', 'P', 'E']
    div = 1024
    exp = 0
    while nbytes > div:
        nbytes /= div
        exp += 1
    if exp < len(exps):
        e = exps[exp]
    else:
        e = f'e{exp * 3}'
    return f'{nbytes:.1f}{e}B'

split_well_name(well_name, remove_leading_zeros=True, col_as_int=False)

Source code in src\util.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def split_well_name(well_name, remove_leading_zeros=True, col_as_int=False):
    matches = re.findall(r'(\D+)(\d+)', well_name)
    if len(matches) > 0:
        row, col = matches[0]
        if col_as_int or remove_leading_zeros:
            try:
                col = int(col)
            except ValueError:
                pass
        if not col_as_int:
            col = str(col)
        return row, col
    else:
        raise ValueError(f"Invalid well name format: {well_name}. Expected format like 'A1', 'B2', etc.")

splitall(path)

Source code in src\util.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def splitall(path):
    allparts = []
    while True:
        parts = os.path.split(path)
        if parts[0] == path:  # sentinel for absolute paths
            allparts.insert(0, parts[0])
            break
        elif parts[1] == path: # sentinel for relative paths
            allparts.insert(0, parts[1])
            break
        else:
            path = parts[0]
            allparts.insert(0, parts[1])
    return allparts

strip_leading_zeros(well_name)

Source code in src\util.py
61
62
63
def strip_leading_zeros(well_name):
    row, col = split_well_name(well_name, remove_leading_zeros=True)
    return f'{row}{col}'

xml_content_to_dict(element)

Source code in src\util.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def xml_content_to_dict(element):
    key = element.tag
    children = list(element)
    if key == 'Array':
        res = [xml_content_to_dict(child) for child in children]
        return res
    if len(children) > 0:
        if children[0].tag == 'Array':
            value = []
        else:
            value = {}
        for child in children:
            child_value = xml_content_to_dict(child)
            if isinstance(child_value, list):
                value.extend(child_value)
            else:
                value |= child_value
    else:
        value = element.text
        if value is not None:
            if '"' in value:
                value = value.replace('"', '')
            else:
                for t in (float, int, bool):
                    try:
                        if t == bool:
                            if value.lower() == 'true':
                                value = True
                            if value.lower() == 'false':
                                value = False
                        else:
                            value = t(value)
                        break
                    except (TypeError, ValueError):
                        pass

    if key == 'DataObject':
        key = element.attrib['ObjectType']
    if key == 'Attribute':
        key = element.attrib['Name']
    return {key: value}