Skip to content

References

convert(input_filename, output_folder, alt_output_folder=None, output_format='omezarr2', show_progress=False, verbose=False, max_attempts=RETRY_ATTEMPTS, **kwargs)

Source code in converter.py
33
34
35
36
37
38
39
40
41
42
43
44
45
def convert(input_filename, output_folder, alt_output_folder=None,
            output_format='omezarr2', show_progress=False, verbose=False, max_attempts=RETRY_ATTEMPTS, **kwargs):
    attempts = 0
    while True:
        try:
            return _convert(input_filename, output_folder, alt_output_folder=alt_output_folder,
                            output_format=output_format, show_progress=show_progress, verbose=verbose,
                            **kwargs)
        except Exception as e:
            if attempts >= max_attempts - 1:
                logging.error(e)
                raise Exception(f'Conversion failed after {RETRY_ATTEMPTS} attempts: {input_filename}')
        attempts += 1

init_logging(log_filename, verbose=False)

Initialize logging to file and optionally to console.

Parameters:

Name Type Description Default
log_filename str

Path to the log file.

required
verbose bool

If True, also log to console.

False
Source code in converter.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def init_logging(log_filename, verbose=False):
    """
    Initialize logging to file and optionally to console.

    Args:
        log_filename (str): Path to the log file.
        verbose (bool): If True, also log to console.
    """
    basepath = os.path.dirname(log_filename)
    if basepath and not os.path.exists(basepath):
        os.makedirs(basepath)
    handlers = [logging.FileHandler(log_filename, encoding='utf-8')]
    if verbose:
        handlers += [logging.StreamHandler()]
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s',
                        handlers=handlers,
                        encoding='utf-8')

    logging.getLogger('ome_zarr').setLevel(logging.WARNING)     # mute verbose ome_zarr logging

DbReader

DbReader

Reads and queries a SQLite database, returning results as dictionaries.

Source code in src/DbReader.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class DbReader:
    """
    Reads and queries a SQLite database, returning results as dictionaries.
    """

    def __init__(self, db_file):
        """
        Initialize DBReader with a database file.

        Args:
            db_file (str): Path to the SQLite database file.
        """
        self.conn = sqlite3.connect(db_file)
        self.conn.row_factory = DbReader.dict_factory

    @staticmethod
    def dict_factory(cursor, row):
        """
        Converts a database row to a dictionary.

        Args:
            cursor: SQLite cursor object.
            row: Row data.

        Returns:
            dict: Mapping column names to values.
        """
        dct = {}
        for index, column in enumerate(cursor.description):
            dct[column[0]] = row[index]
        return dct

    def fetch_all(self, query, params=[], return_dicts=True):
        """
        Executes a query and fetches all results.

        Args:
            query (str): SQL query string.
            params (list): Query parameters.
            return_dicts (bool): If True, returns list of dicts; else, returns first column values.

        Returns:
            list: Query results.
        """
        cursor = self.conn.cursor()
        cursor.execute(query, params)
        dct = cursor.fetchall()
        if return_dicts:
            values = dct
        else:
            values = [list(row.values())[0] for row in dct]
        return values

    def close(self):
        """
        Closes the database connection.
        """
        self.conn.close()

conn = sqlite3.connect(db_file) instance-attribute

__init__(db_file)

Initialize DBReader with a database file.

Parameters:

Name Type Description Default
db_file str

Path to the SQLite database file.

required
Source code in src/DbReader.py
 9
10
11
12
13
14
15
16
17
def __init__(self, db_file):
    """
    Initialize DBReader with a database file.

    Args:
        db_file (str): Path to the SQLite database file.
    """
    self.conn = sqlite3.connect(db_file)
    self.conn.row_factory = DbReader.dict_factory

close()

Closes the database connection.

Source code in src/DbReader.py
57
58
59
60
61
def close(self):
    """
    Closes the database connection.
    """
    self.conn.close()

dict_factory(cursor, row) staticmethod

Converts a database row to a dictionary.

Parameters:

Name Type Description Default
cursor

SQLite cursor object.

required
row

Row data.

required

Returns:

Name Type Description
dict

Mapping column names to values.

Source code in src/DbReader.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@staticmethod
def dict_factory(cursor, row):
    """
    Converts a database row to a dictionary.

    Args:
        cursor: SQLite cursor object.
        row: Row data.

    Returns:
        dict: Mapping column names to values.
    """
    dct = {}
    for index, column in enumerate(cursor.description):
        dct[column[0]] = row[index]
    return dct

fetch_all(query, params=[], return_dicts=True)

Executes a query and fetches all results.

Parameters:

Name Type Description Default
query str

SQL query string.

required
params list

Query parameters.

[]
return_dicts bool

If True, returns list of dicts; else, returns first column values.

True

Returns:

Name Type Description
list

Query results.

Source code in src/DbReader.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def fetch_all(self, query, params=[], return_dicts=True):
    """
    Executes a query and fetches all results.

    Args:
        query (str): SQL query string.
        params (list): Query parameters.
        return_dicts (bool): If True, returns list of dicts; else, returns first column values.

    Returns:
        list: Query results.
    """
    cursor = self.conn.cursor()
    cursor.execute(query, params)
    dct = cursor.fetchall()
    if return_dicts:
        values = dct
    else:
        values = [list(row.values())[0] for row in dct]
    return values

ISyntaxSource

PYRAMID_DOWNSCALE = 2 module-attribute

PYRAMID_LEVELS = 6 module-attribute

RETRY_ATTEMPTS = 3 module-attribute

TIFF_COMPRESSION = 'LZW' module-attribute

TILE_SIZE = 1024 module-attribute

VERSION = 'v0.1.10' module-attribute

ZARR_CHUNK_SIZE = TILE_SIZE module-attribute

ZARR_SHARD_MULTIPLIER = 10 module-attribute

ISyntaxSource

Bases: ImageSource

Loads image and metadata from ISyntax format files.

Source code in src/ISyntaxSource.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class ISyntaxSource(ImageSource):
    """
    Loads image and metadata from ISyntax format files.
    """
    def init_metadata(self):
        # read XML metadata header
        data = b''
        block_size = 1024 * 1024
        end_char = b'\x04'   # EOT character
        with open(self.uri, mode='rb') as file:
            done = False
            while not done:
                data_block = file.read(block_size)
                if end_char in data_block:
                    index = data_block.index(end_char)
                    data_block = data_block[:index]
                    done = True
                data += data_block

        self.metadata = xml_content_to_dict(ElementTree.XML(data.decode()))
        if 'DPUfsImport' in self.metadata:
            self.metadata = self.metadata['DPUfsImport']

        image = None
        image_type = ''
        for image0 in self.metadata.get('PIM_DP_SCANNED_IMAGES', []):
            image = image0.get('DPScannedImage', {})
            image_type = image.get('PIM_DP_IMAGE_TYPE').lower()
            if image_type in ['wsi']:
                break

        if image is not None:
            self.image_type = image_type
            nbits = image.get('UFS_IMAGE_BLOCK_HEADER_TEMPLATES', [{}])[0].get('UFSImageBlockHeaderTemplate', {}).get('DICOM_BITS_STORED', 16)
            nbits = int(np.ceil(nbits / 8)) * 8
        else:
            self.image_type = ''
            nbits = 16

        self.is_plate = 'screen' in self.image_type or 'plate' in self.image_type or 'wells' in self.image_type

        self.isyntax = ISyntax.open(self.uri)
        self.dimensions = self.isyntax.level_dimensions
        self.widths = [width for width, height in self.isyntax.level_dimensions]
        self.heights = [height for width, height in self.isyntax.level_dimensions]
        self.scales = [1 / downsample for downsample in self.isyntax.level_downsamples]

        # original color channels get converted in pyisyntax package to 8-bit RGBA; convert to RGB
        nbits = 8
        self.nchannels = 3
        self.shapes = [(height, width, self.nchannels) for (width, height) in self.dimensions]
        self.shape = self.shapes[0]
        self.dim_order = 'yxc'
        self.is_rgb_channels = True
        self.dtype = np.dtype(f'uint{nbits}')
        self.pixel_size = {'x': self.isyntax.mpp_x, 'y': self.isyntax.mpp_y}

        self.name = get_filetitle(self.uri)
        return self.metadata

    def is_screen(self):
        return self.is_plate

    def get_shape(self):
        return self.shape

    def get_shapes(self):
        return self.shapes

    def get_scales(self):
        return self.scales

    def read_array(self, x, y, width, height, level=0):
        rgba = self.isyntax.read_region(x, y, width, height, level)
        alpha = np.atleast_3d(rgba[..., 3] / np.float32(255))
        rgb = (rgba[..., :3] * alpha).astype(np.uint8)
        return rgb

    def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
        data = self.read_array(0, 0, self.widths[level], self.heights[level], level=level)
        return redimension_data(data, self.dim_order, dim_order)

    def get_data_as_dask(self, dim_order, level=0, **kwargs):
        dask.config.set(scheduler='single-threaded')

        def get_lazy_tile(x, y, width, height, level=0):
            lazy_array = dask.delayed(self.read_array)(x, y, width, height, level)
            return da.from_delayed(lazy_array, shape=(height, width, self.nchannels), dtype=self.dtype)

        y_chunks, x_chunks = da.core.normalize_chunks(TILE_SIZE, self.shapes[level][:2], dtype=self.dtype)
        y_pos = np.cumsum([0] + list(y_chunks)[:-1])
        x_pos = np.cumsum([0] + list(x_chunks)[:-1])
        data = da.concatenate(
            [da.concatenate(
                [get_lazy_tile(x, y, width, height, level=level)
                 for x, width in zip(x_pos, x_chunks)], axis=1)
             for y, height in zip(y_pos, y_chunks)], axis=0)
        return redimension_data(data, self.dim_order, dim_order)

    def get_data_as_generator(self, dim_order, **kwargs):
        def data_generator(scale=1):
            level, rescale = get_level_from_scale(self.scales, scale)
            read_size = int(TILE_SIZE / rescale)
            for y in range(0, self.heights[level], read_size):
                for x in range(0, self.widths[level], read_size):
                    data = self.read_array(x, y, read_size, read_size, level)
                    if rescale != 1:
                        shape = np.multiply(data.shape[:2], rescale).astype(int)
                        data = sk_transform.resize(data, shape, preserve_range=True).astype(data.dtype)
                    yield redimension_data(data, self.dim_order, dim_order)
        return data_generator

    def get_name(self):
        """
        Gets the file title.

        Returns:
            str: Name.
        """
        return self.name

    def get_dim_order(self):
        """
        Returns the dimension order string.

        Returns:
            str: Dimension order.
        """
        return self.dim_order

    def get_pixel_size_um(self):
        """
        Returns the pixel size in micrometers.

        Returns:
            dict: Pixel size dict for x and y.
        """
        return self.pixel_size

    def get_dtype(self):
        """
        Returns the numpy dtype of the image data.

        Returns:
            dtype: Numpy dtype.
        """
        return self.dtype

    def get_position_um(self, well_id=None):
        """
        Returns the position in micrometers (empty for ISyntax).

        Returns:
            dict: Position dict for x and y.
        """
        return {'x': self.isyntax.offset_x, 'y': self.isyntax.offset_y}

    def get_channels(self):
        # Mirax is RGB, return NGFF-style channel metadata
        return [
            {"name": "Red", "color": [1, 0, 0, 1]},
            {"name": "Green", "color": [0, 1, 0, 1]},
            {"name": "Blue", "color": [0, 0, 1, 1]},
            #{"name": "Alpha", "color": [1, 1, 1, 1]}
        ]

    def get_nchannels(self):
        return self.nchannels

    def is_rgb(self):
        return self.is_rgb_channels

    def get_rows(self):
        return []

    def get_columns(self):
        return []

    def get_wells(self):
        return []

    def get_time_points(self):
        return []

    def get_fields(self):
        return []

    def get_acquisitions(self):
        return []

    def get_total_data_size(self):
        total_size = np.prod(self.shape)
        if self.is_plate:
            total_size *= len(self.get_wells()) * len(self.get_fields())
        return total_size

    def close(self):
        self.isyntax.close()
        dask.config.set(scheduler='threads')

close()

Source code in src/ISyntaxSource.py
213
214
215
def close(self):
    self.isyntax.close()
    dask.config.set(scheduler='threads')

get_acquisitions()

Source code in src/ISyntaxSource.py
204
205
def get_acquisitions(self):
    return []

get_channels()

Source code in src/ISyntaxSource.py
174
175
176
177
178
179
180
181
def get_channels(self):
    # Mirax is RGB, return NGFF-style channel metadata
    return [
        {"name": "Red", "color": [1, 0, 0, 1]},
        {"name": "Green", "color": [0, 1, 0, 1]},
        {"name": "Blue", "color": [0, 0, 1, 1]},
        #{"name": "Alpha", "color": [1, 1, 1, 1]}
    ]

get_columns()

Source code in src/ISyntaxSource.py
192
193
def get_columns(self):
    return []

get_data(dim_order, level=0, well_id=None, field_id=None, **kwargs)

Source code in src/ISyntaxSource.py
95
96
97
def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
    data = self.read_array(0, 0, self.widths[level], self.heights[level], level=level)
    return redimension_data(data, self.dim_order, dim_order)

get_data_as_dask(dim_order, level=0, **kwargs)

Source code in src/ISyntaxSource.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def get_data_as_dask(self, dim_order, level=0, **kwargs):
    dask.config.set(scheduler='single-threaded')

    def get_lazy_tile(x, y, width, height, level=0):
        lazy_array = dask.delayed(self.read_array)(x, y, width, height, level)
        return da.from_delayed(lazy_array, shape=(height, width, self.nchannels), dtype=self.dtype)

    y_chunks, x_chunks = da.core.normalize_chunks(TILE_SIZE, self.shapes[level][:2], dtype=self.dtype)
    y_pos = np.cumsum([0] + list(y_chunks)[:-1])
    x_pos = np.cumsum([0] + list(x_chunks)[:-1])
    data = da.concatenate(
        [da.concatenate(
            [get_lazy_tile(x, y, width, height, level=level)
             for x, width in zip(x_pos, x_chunks)], axis=1)
         for y, height in zip(y_pos, y_chunks)], axis=0)
    return redimension_data(data, self.dim_order, dim_order)

get_data_as_generator(dim_order, **kwargs)

Source code in src/ISyntaxSource.py
116
117
118
119
120
121
122
123
124
125
126
127
def get_data_as_generator(self, dim_order, **kwargs):
    def data_generator(scale=1):
        level, rescale = get_level_from_scale(self.scales, scale)
        read_size = int(TILE_SIZE / rescale)
        for y in range(0, self.heights[level], read_size):
            for x in range(0, self.widths[level], read_size):
                data = self.read_array(x, y, read_size, read_size, level)
                if rescale != 1:
                    shape = np.multiply(data.shape[:2], rescale).astype(int)
                    data = sk_transform.resize(data, shape, preserve_range=True).astype(data.dtype)
                yield redimension_data(data, self.dim_order, dim_order)
    return data_generator

get_dim_order()

Returns the dimension order string.

Returns:

Name Type Description
str

Dimension order.

Source code in src/ISyntaxSource.py
138
139
140
141
142
143
144
145
def get_dim_order(self):
    """
    Returns the dimension order string.

    Returns:
        str: Dimension order.
    """
    return self.dim_order

get_dtype()

Returns the numpy dtype of the image data.

Returns:

Name Type Description
dtype

Numpy dtype.

Source code in src/ISyntaxSource.py
156
157
158
159
160
161
162
163
def get_dtype(self):
    """
    Returns the numpy dtype of the image data.

    Returns:
        dtype: Numpy dtype.
    """
    return self.dtype

get_fields()

Source code in src/ISyntaxSource.py
201
202
def get_fields(self):
    return []

get_name()

Gets the file title.

Returns:

Name Type Description
str

Name.

Source code in src/ISyntaxSource.py
129
130
131
132
133
134
135
136
def get_name(self):
    """
    Gets the file title.

    Returns:
        str: Name.
    """
    return self.name

get_nchannels()

Source code in src/ISyntaxSource.py
183
184
def get_nchannels(self):
    return self.nchannels

get_pixel_size_um()

Returns the pixel size in micrometers.

Returns:

Name Type Description
dict

Pixel size dict for x and y.

Source code in src/ISyntaxSource.py
147
148
149
150
151
152
153
154
def get_pixel_size_um(self):
    """
    Returns the pixel size in micrometers.

    Returns:
        dict: Pixel size dict for x and y.
    """
    return self.pixel_size

get_position_um(well_id=None)

Returns the position in micrometers (empty for ISyntax).

Returns:

Name Type Description
dict

Position dict for x and y.

Source code in src/ISyntaxSource.py
165
166
167
168
169
170
171
172
def get_position_um(self, well_id=None):
    """
    Returns the position in micrometers (empty for ISyntax).

    Returns:
        dict: Position dict for x and y.
    """
    return {'x': self.isyntax.offset_x, 'y': self.isyntax.offset_y}

get_rows()

Source code in src/ISyntaxSource.py
189
190
def get_rows(self):
    return []

get_scales()

Source code in src/ISyntaxSource.py
86
87
def get_scales(self):
    return self.scales

get_shape()

Source code in src/ISyntaxSource.py
80
81
def get_shape(self):
    return self.shape

get_shapes()

Source code in src/ISyntaxSource.py
83
84
def get_shapes(self):
    return self.shapes

get_time_points()

Source code in src/ISyntaxSource.py
198
199
def get_time_points(self):
    return []

get_total_data_size()

Source code in src/ISyntaxSource.py
207
208
209
210
211
def get_total_data_size(self):
    total_size = np.prod(self.shape)
    if self.is_plate:
        total_size *= len(self.get_wells()) * len(self.get_fields())
    return total_size

get_wells()

Source code in src/ISyntaxSource.py
195
196
def get_wells(self):
    return []

init_metadata()

Source code in src/ISyntaxSource.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def init_metadata(self):
    # read XML metadata header
    data = b''
    block_size = 1024 * 1024
    end_char = b'\x04'   # EOT character
    with open(self.uri, mode='rb') as file:
        done = False
        while not done:
            data_block = file.read(block_size)
            if end_char in data_block:
                index = data_block.index(end_char)
                data_block = data_block[:index]
                done = True
            data += data_block

    self.metadata = xml_content_to_dict(ElementTree.XML(data.decode()))
    if 'DPUfsImport' in self.metadata:
        self.metadata = self.metadata['DPUfsImport']

    image = None
    image_type = ''
    for image0 in self.metadata.get('PIM_DP_SCANNED_IMAGES', []):
        image = image0.get('DPScannedImage', {})
        image_type = image.get('PIM_DP_IMAGE_TYPE').lower()
        if image_type in ['wsi']:
            break

    if image is not None:
        self.image_type = image_type
        nbits = image.get('UFS_IMAGE_BLOCK_HEADER_TEMPLATES', [{}])[0].get('UFSImageBlockHeaderTemplate', {}).get('DICOM_BITS_STORED', 16)
        nbits = int(np.ceil(nbits / 8)) * 8
    else:
        self.image_type = ''
        nbits = 16

    self.is_plate = 'screen' in self.image_type or 'plate' in self.image_type or 'wells' in self.image_type

    self.isyntax = ISyntax.open(self.uri)
    self.dimensions = self.isyntax.level_dimensions
    self.widths = [width for width, height in self.isyntax.level_dimensions]
    self.heights = [height for width, height in self.isyntax.level_dimensions]
    self.scales = [1 / downsample for downsample in self.isyntax.level_downsamples]

    # original color channels get converted in pyisyntax package to 8-bit RGBA; convert to RGB
    nbits = 8
    self.nchannels = 3
    self.shapes = [(height, width, self.nchannels) for (width, height) in self.dimensions]
    self.shape = self.shapes[0]
    self.dim_order = 'yxc'
    self.is_rgb_channels = True
    self.dtype = np.dtype(f'uint{nbits}')
    self.pixel_size = {'x': self.isyntax.mpp_x, 'y': self.isyntax.mpp_y}

    self.name = get_filetitle(self.uri)
    return self.metadata

is_rgb()

Source code in src/ISyntaxSource.py
186
187
def is_rgb(self):
    return self.is_rgb_channels

is_screen()

Source code in src/ISyntaxSource.py
77
78
def is_screen(self):
    return self.is_plate

read_array(x, y, width, height, level=0)

Source code in src/ISyntaxSource.py
89
90
91
92
93
def read_array(self, x, y, width, height, level=0):
    rgba = self.isyntax.read_region(x, y, width, height, level)
    alpha = np.atleast_3d(rgba[..., 3] / np.float32(255))
    rgb = (rgba[..., :3] * alpha).astype(np.uint8)
    return rgb

ImageDbSource

ImageDbSource

Bases: ImageSource

Loads image and metadata from a database source for high-content screening.

Source code in src/ImageDbSource.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
class ImageDbSource(ImageSource):
    """
    Loads image and metadata from a database source for high-content screening.
    """
    def __init__(self, uri, metadata={}):
        super().__init__(uri, metadata)
        self.db = DbReader(self.uri)
        self.data = None
        self.data_well_id = None
        self.data_level = None
        self.dim_order = 'tczyx'

    def init_metadata(self):
        self._get_time_series_info()
        self._get_experiment_metadata()
        self._get_well_info()
        self._get_image_info()
        self._get_sizes()
        return self.metadata

    def get_shape(self):
        return self.shape

    def get_shapes(self):
        return self.shapes

    def get_scales(self):
        return self.scales

    def _get_time_series_info(self):
        """
        Loads time series and image file info into metadata.
        """
        time_series_ids = sorted(self.db.fetch_all('SELECT DISTINCT TimeSeriesElementId FROM SourceImageBase', return_dicts=False))
        self.time_points = time_series_ids

        level_ids = sorted(self.db.fetch_all('SELECT DISTINCT level FROM SourceImageBase', return_dicts=False))
        self.levels = level_ids

        image_files = {time_series_id: os.path.join(os.path.dirname(self.uri), f'images-{time_series_id}.db')
                       for time_series_id in time_series_ids}
        self.image_files = image_files

    def _get_experiment_metadata(self):
        """
        Loads experiment metadata and acquisition info into metadata.
        """
        creation_info = self.db.fetch_all('SELECT DateCreated, Creator, Name FROM ExperimentBase')[0]
        creation_info['DateCreated'] = convert_dotnet_ticks_to_datetime(creation_info['DateCreated'])
        self.metadata.update(creation_info)

        acquisitions = self.db.fetch_all('SELECT Name, Description, DateCreated, DateModified FROM AcquisitionExp')
        for acquisition in acquisitions:
            acquisition['DateCreated'] = convert_dotnet_ticks_to_datetime(acquisition['DateCreated'])
            acquisition['DateModified'] = convert_dotnet_ticks_to_datetime(acquisition['DateModified'])
        self.acquisitions = acquisitions

    def _get_well_info(self):
        """
        Loads well and channel information into metadata.
        """
        well_info = self.db.fetch_all('''
            SELECT SensorSizeYPixels, SensorSizeXPixels, Objective, PixelSizeUm, SensorBitness, SitesX, SitesY
            FROM AcquisitionExp, AutomaticZonesParametersExp
        ''')[0]

        # Filter multiple duplicate channel entries
        channel_infos = self.db.fetch_all('''
            SELECT DISTINCT ChannelNumber, Emission, Excitation, Dye, Color
            FROM ImagechannelExp
            ORDER BY ChannelNumber
        ''')
        self.channels = channel_infos
        self.nchannels = len(channel_infos)

        wells = self.db.fetch_all('SELECT DISTINCT Name FROM Well')
        zone_names = [well['Name'] for well in wells]
        rows = set()
        cols = set()
        for zone_name in zone_names:
            row, col = split_well_name(zone_name)
            rows.add(row)
            cols.add(col)
        self.rows = sorted(list(rows))
        self.columns = sorted(list(cols), key=lambda x: int(x))
        nfields = well_info['SitesX'] * well_info['SitesY'] * well_info.get('SitesZ', 1)
        self.fields = list(range(nfields))
        self.well_info = well_info
        self.metadata['well_info'] = well_info

        image_wells = self.db.fetch_all('SELECT Name, ZoneIndex, CoordX, CoordY FROM Well WHERE HasImages = 1')
        self.wells = dict(sorted({well['Name']: well for well in image_wells}.items(),
                                             key=lambda x: split_well_name(x[0], col_as_int=True)))
        self.metadata['wells'] = self.wells
        self.pixel_size = well_info.get('PixelSizeUm', 1)

    def _get_image_info(self):
        """
        Loads image bit depth and dtype info into metadata.
        """
        bits_per_pixel = self.db.fetch_all('SELECT DISTINCT BitsPerPixel FROM SourceImageBase', return_dicts=False)[0]
        self.bits_per_pixel = bits_per_pixel
        bits_per_pixel = int(np.ceil(bits_per_pixel / 8)) * 8
        if bits_per_pixel == 24:
            bits_per_pixel = 32
        self.dtype = np.dtype(f'uint{bits_per_pixel}')

    def _get_sizes(self):
        """
        Calculates and stores image shape and estimated data size.
        """
        shapes = []
        scales = []
        widths = []
        heights = []
        width0, height0 = self.well_info['SensorSizeXPixels'], self.well_info['SensorSizeYPixels']
        sizex0, sizey0 = None, None
        # Iterate through levels to get level size factor (SourceImageBase contains field-composite images)
        for level in self.levels:
            level_info = self.db.fetch_all(
                'SELECT MAX(CoordX + SizeX) as width, MAX(CoordY + SizeY) as height FROM SourceImageBase WHERE level = ?',
                [level])
            sizex, sizey = level_info[0]['width'], level_info[0]['height']
            if level == 0:
                sizex0, sizey0 = sizex, sizey
            width, height = width0 * sizex // sizex0, height0 * sizey // sizey0
            widths.append(width)
            heights.append(height)
            shape = len(self.time_points), self.nchannels, 1, height, width
            scale = np.mean([width / widths[0], height / heights[0]])
            shapes.append(shape)
            scales.append(scale)
        self.widths = widths
        self.heights = heights
        self.shape = shapes[0]
        self.shapes = shapes
        self.scales = scales
        self.max_data_size = np.prod(self.shape) * self.dtype.itemsize * len(self.wells) * len(self.fields)

    def _read_well_info(self, well_id, channel=None, time_point=None, level=0):
        """
        Reads image info for a specific well, optionally filtered by channel and time point.

        Args:
            well_id (str): Well identifier.
            channel (int, optional): Channel ID.
            time_point (int, optional): Time point ID.
            level (int, optional): Image level index.

        Returns:
            list: Well image info dictionaries.
        """
        well_id = strip_leading_zeros(well_id)
        well_ids = self.wells

        if well_id not in well_ids:
            raise ValueError(f'Invalid Well: {well_id}. Available values: {well_ids}')

        zone_index = well_ids[well_id]['ZoneIndex']
        well_info = self.db.fetch_all('''
            SELECT *
            FROM SourceImageBase
            WHERE ZoneIndex = ? AND level = ?
            ORDER BY CoordX ASC, CoordY ASC
        ''', [zone_index, level])

        if channel is not None:
             well_info = [info for info in well_info if info['ChannelId'] == channel]
        if time_point is not None:
             well_info = [info for info in well_info if info['TimeSeriesElementId'] == time_point]
        if not well_info:
            raise ValueError(f'No data found for well {well_id}')
        return well_info

    def _assemble_image_data(self, well_info):
        """
        Assembles image data array using well info.

        Args:
            well_info (list): List of well image info dicts.
        """
        well_info = np.asarray(well_info)
        xmax = np.max([info['CoordX'] + info['SizeX'] for info in well_info])
        ymax = np.max([info['CoordY'] + info['SizeY'] for info in well_info])
        zmax = np.max([info.get('CoordZ', 0) + info.get('SizeZ', 1) for info in well_info])
        nc = len(set([info['ChannelId'] for info in well_info]))
        nt = len(set([info['TimeSeriesElementId'] for info in well_info]))
        data = np.zeros((nt, nc, zmax, ymax, xmax), dtype=self.dtype)

        for timei, time_id in enumerate(self.time_points):
            image_file = self.image_files[time_id]
            with open(image_file, 'rb') as fid:
                for info in well_info:
                    if info['TimeSeriesElementId'] == time_id:
                        fid.seek(info['ImageIndex'])
                        coordx, coordy, coordz = info['CoordX'], info['CoordY'], info.get('CoordZ', 0)
                        sizex, sizey, sizez = info['SizeX'], info['SizeY'], info.get('SizeZ', 1)
                        channeli = info['ChannelId']
                        tile = np.fromfile(fid, dtype=self.dtype, count=sizez * sizey * sizex)
                        data[timei, channeli, coordz:coordz + sizez, coordy:coordy + sizey, coordx:coordx + sizex] = tile.reshape((sizez, sizey, sizex))

        self.data = data

    def _extract_site(self, site_id=None):
        """
        Extracts image data for a specific site or all sites.

        Args:
            site_id (int, optional): Site index. If None, returns all data.

        Returns:
            ndarray or list: Image data for the site(s).
        """
        well_info = self.well_info
        sitesx = well_info['SitesX']
        sitesy = well_info['SitesY']
        sitesz = well_info.get('SitesZ', 1)
        nfields = len(self.fields)
        sizex = well_info['SensorSizeXPixels']
        sizey = well_info['SensorSizeYPixels']
        sizez = well_info.get('SensorSizeZPixels', 1)

        if site_id is None:
            # Return full image data
            return self.data

        site_id = int(site_id)
        if site_id < 0:
            # Return list of all fields
            data = []
            for zi in range(sitesz):
                for yi in range(sitesy):
                    for xi in range(sitesx):
                        startx = xi * sizex
                        starty = yi * sizey
                        startz = zi * sizez
                        data.append(self.data[..., startz:startz + sizez, starty:starty + sizey, startx:startx + sizex])
            return data
        elif 0 <= site_id < nfields:
            # Return specific site
            xi = site_id % sitesx
            yi = (site_id // sitesx) % sitesy
            zi = site_id // sitesx // sitesy
            startx = xi * sizex
            starty = yi * sizey
            startz = zi * sizez
            return self.data[..., startz:startz + sizez, starty:starty + sizey, startx:startx + sizex]
        else:
            raise ValueError(f'Invalid site: {site_id}')

    def is_screen(self):
        return len(self.wells) > 0

    def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
        if not (well_id == self.data_well_id and level == self.data_level):
            self._assemble_image_data(self._read_well_info(well_id, level=level))
            self.data_well_id = well_id
            self.data_level = level
        return redimension_data(self._extract_site(field_id), self.dim_order, dim_order)

    def get_name(self):
        name = self.metadata.get('Name')
        if not name:
            name = splitall(os.path.splitext(self.uri)[0])[-2]
        return name

    def get_rows(self):
        return self.rows

    def get_columns(self):
        return self.columns

    def get_wells(self):
        return list(self.wells)

    def get_time_points(self):
        return self.time_points

    def get_fields(self):
        return self.fields

    def get_dim_order(self):
        return self.dim_order

    def get_dtype(self):
        return self.dtype

    def get_pixel_size_um(self):
        return {'x': self.pixel_size, 'y': self.pixel_size}

    def get_position_um(self, well_id=None, level=0):
        well = self.wells[well_id]
        x = well.get('CoordX', 0) * self.widths[level] * self.pixel_size
        y = well.get('CoordY', 0) * self.heights[level] * self.pixel_size
        return {'x': x, 'y': y}

    def get_channels(self):
        channels = []
        for channel0 in self.channels:
            channel = {}
            if 'Dye' in channel0 and channel0['Dye']:
                channel['label'] = channel0['Dye']
            if 'Color' in channel0:
                channel['color'] = hexrgb_to_rgba(channel0['Color'].lstrip('#'))
            channels.append(channel)
        return channels

    def get_nchannels(self):
        return max(self.nchannels, 1)

    def is_rgb(self):
        return False

    def get_acquisitions(self):
        acquisitions = []
        for index, acq in enumerate(self.acquisitions):
            acquisitions.append({
                'id': index,
                'name': acq['Name'],
                'description': acq['Description'],
                'date_created': acq['DateCreated'].isoformat(),
                'date_modified': acq['DateModified'].isoformat()
            })
        return acquisitions

    def get_total_data_size(self):
        return self.max_data_size

    def print_timepoint_well_matrix(self):
        s = ''

        time_points = self.time_points
        wells = [well for well in self.wells]

        well_matrix = []
        for timepoint in time_points:
            wells_at_timepoint = self.db.fetch_all('''
                SELECT DISTINCT Well.Name FROM SourceImageBase
                JOIN Well ON SourceImageBase.ZoneIndex = Well.ZoneIndex
                WHERE TimeSeriesElementId = ?
            ''', [timepoint], return_dicts=False)

            row = ['+' if well in wells_at_timepoint else ' ' for well in wells]
            well_matrix.append(row)

        header = ' '.join([pad_leading_zero(well) for well in wells])
        s += 'Timepoint ' + header + '\n'
        for idx, row in enumerate(well_matrix):
            s += f'{time_points[idx]:9}  ' + '   '.join(row) + '\n'
        return s

    def close(self):
        """
        Closes the database connection.
        """
        self.db.close()

data = None instance-attribute

data_level = None instance-attribute

data_well_id = None instance-attribute

db = DbReader(self.uri) instance-attribute

dim_order = 'tczyx' instance-attribute

__init__(uri, metadata={})

Source code in src/ImageDbSource.py
17
18
19
20
21
22
23
def __init__(self, uri, metadata={}):
    super().__init__(uri, metadata)
    self.db = DbReader(self.uri)
    self.data = None
    self.data_well_id = None
    self.data_level = None
    self.dim_order = 'tczyx'

close()

Closes the database connection.

Source code in src/ImageDbSource.py
364
365
366
367
368
def close(self):
    """
    Closes the database connection.
    """
    self.db.close()

get_acquisitions()

Source code in src/ImageDbSource.py
326
327
328
329
330
331
332
333
334
335
336
def get_acquisitions(self):
    acquisitions = []
    for index, acq in enumerate(self.acquisitions):
        acquisitions.append({
            'id': index,
            'name': acq['Name'],
            'description': acq['Description'],
            'date_created': acq['DateCreated'].isoformat(),
            'date_modified': acq['DateModified'].isoformat()
        })
    return acquisitions

get_channels()

Source code in src/ImageDbSource.py
309
310
311
312
313
314
315
316
317
318
def get_channels(self):
    channels = []
    for channel0 in self.channels:
        channel = {}
        if 'Dye' in channel0 and channel0['Dye']:
            channel['label'] = channel0['Dye']
        if 'Color' in channel0:
            channel['color'] = hexrgb_to_rgba(channel0['Color'].lstrip('#'))
        channels.append(channel)
    return channels

get_columns()

Source code in src/ImageDbSource.py
282
283
def get_columns(self):
    return self.columns

get_data(dim_order, level=0, well_id=None, field_id=None, **kwargs)

Source code in src/ImageDbSource.py
266
267
268
269
270
271
def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
    if not (well_id == self.data_well_id and level == self.data_level):
        self._assemble_image_data(self._read_well_info(well_id, level=level))
        self.data_well_id = well_id
        self.data_level = level
    return redimension_data(self._extract_site(field_id), self.dim_order, dim_order)

get_dim_order()

Source code in src/ImageDbSource.py
294
295
def get_dim_order(self):
    return self.dim_order

get_dtype()

Source code in src/ImageDbSource.py
297
298
def get_dtype(self):
    return self.dtype

get_fields()

Source code in src/ImageDbSource.py
291
292
def get_fields(self):
    return self.fields

get_name()

Source code in src/ImageDbSource.py
273
274
275
276
277
def get_name(self):
    name = self.metadata.get('Name')
    if not name:
        name = splitall(os.path.splitext(self.uri)[0])[-2]
    return name

get_nchannels()

Source code in src/ImageDbSource.py
320
321
def get_nchannels(self):
    return max(self.nchannels, 1)

get_pixel_size_um()

Source code in src/ImageDbSource.py
300
301
def get_pixel_size_um(self):
    return {'x': self.pixel_size, 'y': self.pixel_size}

get_position_um(well_id=None, level=0)

Source code in src/ImageDbSource.py
303
304
305
306
307
def get_position_um(self, well_id=None, level=0):
    well = self.wells[well_id]
    x = well.get('CoordX', 0) * self.widths[level] * self.pixel_size
    y = well.get('CoordY', 0) * self.heights[level] * self.pixel_size
    return {'x': x, 'y': y}

get_rows()

Source code in src/ImageDbSource.py
279
280
def get_rows(self):
    return self.rows

get_scales()

Source code in src/ImageDbSource.py
39
40
def get_scales(self):
    return self.scales

get_shape()

Source code in src/ImageDbSource.py
33
34
def get_shape(self):
    return self.shape

get_shapes()

Source code in src/ImageDbSource.py
36
37
def get_shapes(self):
    return self.shapes

get_time_points()

Source code in src/ImageDbSource.py
288
289
def get_time_points(self):
    return self.time_points

get_total_data_size()

Source code in src/ImageDbSource.py
338
339
def get_total_data_size(self):
    return self.max_data_size

get_wells()

Source code in src/ImageDbSource.py
285
286
def get_wells(self):
    return list(self.wells)

init_metadata()

Source code in src/ImageDbSource.py
25
26
27
28
29
30
31
def init_metadata(self):
    self._get_time_series_info()
    self._get_experiment_metadata()
    self._get_well_info()
    self._get_image_info()
    self._get_sizes()
    return self.metadata

is_rgb()

Source code in src/ImageDbSource.py
323
324
def is_rgb(self):
    return False

is_screen()

Source code in src/ImageDbSource.py
263
264
def is_screen(self):
    return len(self.wells) > 0

print_timepoint_well_matrix()

Source code in src/ImageDbSource.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def print_timepoint_well_matrix(self):
    s = ''

    time_points = self.time_points
    wells = [well for well in self.wells]

    well_matrix = []
    for timepoint in time_points:
        wells_at_timepoint = self.db.fetch_all('''
            SELECT DISTINCT Well.Name FROM SourceImageBase
            JOIN Well ON SourceImageBase.ZoneIndex = Well.ZoneIndex
            WHERE TimeSeriesElementId = ?
        ''', [timepoint], return_dicts=False)

        row = ['+' if well in wells_at_timepoint else ' ' for well in wells]
        well_matrix.append(row)

    header = ' '.join([pad_leading_zero(well) for well in wells])
    s += 'Timepoint ' + header + '\n'
    for idx, row in enumerate(well_matrix):
        s += f'{time_points[idx]:9}  ' + '   '.join(row) + '\n'
    return s

convert_dotnet_ticks_to_datetime(net_ticks)

Source code in src/util.py
137
138
def convert_dotnet_ticks_to_datetime(net_ticks):
    return datetime(1, 1, 1) + timedelta(microseconds=net_ticks // 10)

convert_to_um(value, unit)

Source code in src/util.py
184
185
186
187
188
189
190
191
192
def convert_to_um(value, unit):
    conversions = {
        'nm': 1e-3,
        'µm': 1, 'um': 1, 'micrometer': 1, 'micron': 1,
        'mm': 1e3, 'millimeter': 1e3,
        'cm': 1e4, 'centimeter': 1e4,
        'm': 1e6, 'meter': 1e6
    }
    return value * conversions.get(unit, 1)

ensure_list(item)

Source code in src/util.py
 7
 8
 9
10
def ensure_list(item):
    if not isinstance(item, (list, tuple)):
        item = [item]
    return item

get_filetitle(filename)

Source code in src/util.py
68
69
def get_filetitle(filename):
    return os.path.basename(os.path.splitext(filename)[0])

get_level_from_scale(source_scales, target_scale=1)

Source code in src/util.py
54
55
56
57
58
59
60
61
def get_level_from_scale(source_scales, target_scale=1):
    best_level_scale = 0, target_scale
    for level, scale in enumerate(source_scales):
        if np.isclose(scale, target_scale, rtol=1e-4):
            return level, 1
        if scale <= target_scale:
            best_level_scale = level, target_scale / scale
    return best_level_scale

get_numpy_data(data, dim_order, t, c, z, y, x, y_size, x_size)

Source code in src/util.py
39
40
41
42
43
44
45
46
47
48
49
50
51
def get_numpy_data(data, dim_order, t, c, z, y, x, y_size, x_size):
    x_index = dim_order.index('x')
    y_index = dim_order.index('y')
    slices = [slice(None)] * len(dim_order)
    if 't' in dim_order:
        slices[dim_order.index('t')] = t
    if 'c' in dim_order:
        slices[dim_order.index('c')] = c
    if 'z' in dim_order:
        slices[dim_order.index('z')] = z
    slices[y_index] = slice(y, y + y_size)
    slices[x_index] = slice(x, x + x_size)
    return data[tuple(slices)]

get_rows_cols_plate(nwells)

Source code in src/util.py
122
123
124
125
126
127
128
129
130
131
132
133
134
def get_rows_cols_plate(nwells):
    nrows_cols = {
        6: (2, 3),
        12: (3, 4),
        24: (4, 6),
        48: (6, 8),
        96: (8, 12),
        384: (16, 24)
    }
    nrows, ncols = nrows_cols[nwells]
    rows = [chr(ord('A') + i) for i in range(nrows)]
    cols = [str(i + 1) for i in range(ncols)]
    return rows, cols

pad_leading_zero(input_string, num_digits=2)

Source code in src/util.py
104
105
106
107
108
109
110
111
112
113
114
def pad_leading_zero(input_string, num_digits=2):
    output = str(input_string)
    is_well = not output[0].isdigit()
    if is_well:
        row, col = split_well_name(output, remove_leading_zeros=True)
        output = str(col)
    while len(output) < num_digits:
        output = '0' + output
    if is_well:
        output = row + output
    return output

print_dict(value, tab=0, max_len=250, bullet=False)

Source code in src/util.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def print_dict(value, tab=0, max_len=250, bullet=False):
    s = ''
    if isinstance(value, dict):
        for key, subvalue in value.items():
            s += '\n'
            if bullet:
                s += '-'
                bullet = False
            s += '\t' * tab + str(key) + ': '
            if isinstance(subvalue, dict):
                s += print_dict(subvalue, tab+1)
            elif isinstance(subvalue, list):
                for v in subvalue:
                    s += print_dict(v, tab+1, bullet=True)
            else:
                subvalue = str(subvalue)
                if len(subvalue) > max_len:
                    subvalue = subvalue[:max_len] + '...'
                s += subvalue
    else:
        s += str(value) + ' '
    return s

print_hbytes(nbytes)

Source code in src/util.py
219
220
221
222
223
224
225
226
227
228
229
230
def print_hbytes(nbytes):
    exps = ['', 'K', 'M', 'G', 'T', 'P', 'E']
    div = 1024
    exp = 0
    while nbytes > div:
        nbytes /= div
        exp += 1
    if exp < len(exps):
        e = exps[exp]
    else:
        e = f'e{exp * 3}'
    return f'{nbytes:.1f}{e}B'

redimension_data(data, old_order, new_order, **indices)

Source code in src/util.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def redimension_data(data, old_order, new_order, **indices):
    # able to provide optional dimension values e.g. t=0, z=0
    if new_order == old_order:
        return data

    new_data = data
    order = old_order
    # remove
    for o in old_order:
        if o not in new_order:
            index = order.index(o)
            dim_value = indices.get(o, 0)
            new_data = np.take(new_data, indices=dim_value, axis=index)
            order = order[:index] + order[index + 1:]
    # add
    for o in new_order:
        if o not in order:
            new_data = np.expand_dims(new_data, 0)
            order = o + order
    # move
    old_indices = [order.index(o) for o in new_order]
    new_indices = list(range(len(new_order)))
    new_data = np.moveaxis(new_data, old_indices, new_indices)
    return new_data

split_well_name(well_name, remove_leading_zeros=True, col_as_int=False)

Source code in src/util.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def split_well_name(well_name, remove_leading_zeros=True, col_as_int=False):
    matches = re.findall(r'(\D+)(\d+)', well_name)
    if len(matches) > 0:
        row, col = matches[0]
        if col_as_int or remove_leading_zeros:
            try:
                col = int(col)
            except ValueError:
                pass
        if not col_as_int:
            col = str(col)
        return row, col
    else:
        raise ValueError(f"Invalid well name format: {well_name}. Expected format like 'A1', 'B2', etc.")

splitall(path)

Source code in src/util.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def splitall(path):
    allparts = []
    while True:
        parts = os.path.split(path)
        if parts[0] == path:  # sentinel for absolute paths
            allparts.insert(0, parts[0])
            break
        elif parts[1] == path: # sentinel for relative paths
            allparts.insert(0, parts[1])
            break
        else:
            path = parts[0]
            allparts.insert(0, parts[1])
    return allparts

strip_leading_zeros(well_name)

Source code in src/util.py
117
118
119
def strip_leading_zeros(well_name):
    row, col = split_well_name(well_name, remove_leading_zeros=True)
    return f'{row}{col}'

validate_filename(filename)

Source code in src/util.py
64
65
def validate_filename(filename):
    return re.sub(r'[^\w_.)(-]', '_', filename)

xml_content_to_dict(element)

Source code in src/util.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def xml_content_to_dict(element):
    key = element.tag
    children = list(element)
    if key == 'Array':
        res = [xml_content_to_dict(child) for child in children]
        return res
    if len(children) > 0:
        if children[0].tag == 'Array':
            value = []
        else:
            value = {}
        for child in children:
            child_value = xml_content_to_dict(child)
            if isinstance(child_value, list):
                value.extend(child_value)
            else:
                value |= child_value
    else:
        value = element.text
        if value is not None:
            if '"' in value:
                value = value.replace('"', '')
            else:
                for t in (float, int, bool):
                    try:
                        if t == bool:
                            if value.lower() == 'true':
                                value = True
                            if value.lower() == 'false':
                                value = False
                        else:
                            value = t(value)
                        break
                    except (TypeError, ValueError):
                        pass

    if key == 'DataObject':
        key = element.attrib['ObjectType']
    if key == 'Attribute':
        key = element.attrib['Name']
    return {key: value}

ImageSource

ImageSource

Bases: ABC

Abstract base class for image sources.

Source code in src/ImageSource.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class ImageSource(ABC):
    """
    Abstract base class for image sources.
    """

    def __init__(self, uri, metadata={}):
        """
        Initialize ImageSource.

        Args:
            uri (str): Path to the image source.
            metadata (dict): Optional metadata dictionary.
        """
        self.uri = uri
        self.metadata = metadata

    def init_metadata(self):
        """
        Initialize and load metadata.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'init_metadata' method must be implemented by subclasses.")

    def is_screen(self):
        """
        Check if the source is a screen (multi-well).

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'is_screen' method must be implemented by subclasses.")

    def get_shape(self):
        """
        Get the shape of the image data.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_shape' method must be implemented by subclasses.")

    def get_shapes(self):
        """
        Get a list of shapes corresponding to the image data levels.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_shapes' method must be implemented by subclasses.")

    def get_scales(self):
        """
        Get the list of image scales.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_scales' method must be implemented by subclasses.")

    def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
        """
        Get image data for a well and field.

        Args:
            dim_order: Dimension order of data
            level (int, optional): Image resolution level
            well_id (str, optional): Well identifier
            field_id (int, optional): Field identifier
            kwargs (optional): Format specific keyword arguments.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_data' method must be implemented by subclasses.")

    def get_data_as_dask(self, dim_order, level=0, **kwargs):
        """
        Get image data (WSI) as dask array.

        Args:
            dim_order: Dimension order of data
            level (int, optional): Image resolution level
            kwargs (optional): Format specific keyword arguments.
        """
        return self.get_data(dim_order, level=level, **kwargs)

    def get_data_as_generator(self, dim_order, **kwargs):
        """
        Get image data (WSI) as generator.

        Args:
            dim_order: Dimension order of data
            kwargs (optional): Format specific keyword arguments.
        """
        return self.get_data(dim_order, **kwargs)

    def get_image_window(self, window_scanner, well_id=None, field_id=None, data=None):
        """
        Get image value range window (for a well & field or from provided data).

        Args:
            window_scanner (WindowScanner): WindowScanner object to compute window.
            well_id (str, optional): Well identifier
            field_id (int, optional): Field identifier
            data (ndarray, optional): Image data to compute window from.
        """
        # For RGB(A) uint8 images don't change color value range
        if self.get_dtype() != np.uint8:
            if data is None:
                for level, shape in enumerate(self.get_shapes()):
                    if np.prod(shape) * self.get_dtype().itemsize < 1e8:  # less than 100 MB
                        data = self.get_data(self.get_dim_order(), well_id=well_id, field_id=field_id, level=level)
                        break
            if data is not None:
                window_scanner.process(data, self.get_dim_order())
        return window_scanner.get_window()

    def get_name(self):
        """
        Get the name of the image source.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_name' method must be implemented by subclasses.")

    def get_dim_order(self):
        """
        Get the dimension order string.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_dim_order' method must be implemented by subclasses.")

    def get_dtype(self):
        """
        Get the numpy dtype of the image data.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_dtype' method must be implemented by subclasses.")

    def get_pixel_size_um(self):
        """
        Get the pixel size in micrometers.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_pixel_size_um' method must be implemented by subclasses.")

    def get_position_um(self, well_id=None):
        """
        Get the position in micrometers for a well.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_position_um' method must be implemented by subclasses.")

    def get_channels(self):
        """
        Get channel metadata in NGFF format, color provided as RGBA list with values between 0 and 1
        e.g. white = [1, 1, 1, 1]

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_channels' method must be implemented by subclasses.")

    def get_nchannels(self):
        """
        Get the number of channels.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_nchannels' method must be implemented by subclasses.")

    def is_rgb(self):
        """
        Check if the source is a RGB(A) image.
        """
        raise NotImplementedError("The 'is_rgb' method must be implemented by subclasses.")

    def get_rows(self):
        """
        Get the list of row identifiers.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_rows' method must be implemented by subclasses.")

    def get_columns(self):
        """
        Get the list of column identifiers.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_columns' method must be implemented by subclasses.")

    def get_wells(self):
        """
        Get the list of well identifiers.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_wells' method must be implemented by subclasses.")

    def get_time_points(self):
        """
        Get the list of time points.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_time_points' method must be implemented by subclasses.")

    def get_fields(self):
        """
        Get the list of field indices.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_fields' method must be implemented by subclasses.")

    def get_acquisitions(self):
        """
        Get acquisition metadata.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_acquisitions' method must be implemented by subclasses.")

    def get_total_data_size(self):
        """
        Get the estimated total data size.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError("The 'get_total_data_size' method must be implemented by subclasses.")

    def close(self):
        """
        Close the image source.
        """
        pass


    def print_well_matrix(self):
        """
        Print a matrix representation of the well plate.
        """
        s = ''

        rows, cols = self.get_rows(), self.get_columns()
        used_wells = [well for well in self.get_wells()]

        well_matrix = []
        for row_id in rows:
            row = ''
            for col_id in cols:
                well_id = f'{row_id}{col_id}'
                row += '+' if well_id in used_wells else ' '
            well_matrix.append(row)

        header = ' '.join([pad_leading_zero(col) for col in cols])
        s += ' ' + header + '\n'
        for idx, row in enumerate(well_matrix):
            s += f'{rows[idx]} ' + '  '.join(row) + '\n'
        return s

metadata = metadata instance-attribute

uri = uri instance-attribute

__init__(uri, metadata={})

Initialize ImageSource.

Parameters:

Name Type Description Default
uri str

Path to the image source.

required
metadata dict

Optional metadata dictionary.

{}
Source code in src/ImageSource.py
12
13
14
15
16
17
18
19
20
21
def __init__(self, uri, metadata={}):
    """
    Initialize ImageSource.

    Args:
        uri (str): Path to the image source.
        metadata (dict): Optional metadata dictionary.
    """
    self.uri = uri
    self.metadata = metadata

close()

Close the image source.

Source code in src/ImageSource.py
259
260
261
262
263
def close(self):
    """
    Close the image source.
    """
    pass

get_acquisitions()

Get acquisition metadata.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
241
242
243
244
245
246
247
248
def get_acquisitions(self):
    """
    Get acquisition metadata.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_acquisitions' method must be implemented by subclasses.")

get_channels()

Get channel metadata in NGFF format, color provided as RGBA list with values between 0 and 1 e.g. white = [1, 1, 1, 1]

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
171
172
173
174
175
176
177
178
179
def get_channels(self):
    """
    Get channel metadata in NGFF format, color provided as RGBA list with values between 0 and 1
    e.g. white = [1, 1, 1, 1]

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_channels' method must be implemented by subclasses.")

get_columns()

Get the list of column identifiers.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
205
206
207
208
209
210
211
212
def get_columns(self):
    """
    Get the list of column identifiers.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_columns' method must be implemented by subclasses.")

get_data(dim_order, level=0, well_id=None, field_id=None, **kwargs)

Get image data for a well and field.

Parameters:

Name Type Description Default
dim_order

Dimension order of data

required
level int

Image resolution level

0
well_id str

Well identifier

None
field_id int

Field identifier

None
kwargs optional

Format specific keyword arguments.

{}

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
    """
    Get image data for a well and field.

    Args:
        dim_order: Dimension order of data
        level (int, optional): Image resolution level
        well_id (str, optional): Well identifier
        field_id (int, optional): Field identifier
        kwargs (optional): Format specific keyword arguments.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_data' method must be implemented by subclasses.")

get_data_as_dask(dim_order, level=0, **kwargs)

Get image data (WSI) as dask array.

Parameters:

Name Type Description Default
dim_order

Dimension order of data

required
level int

Image resolution level

0
kwargs optional

Format specific keyword arguments.

{}
Source code in src/ImageSource.py
84
85
86
87
88
89
90
91
92
93
def get_data_as_dask(self, dim_order, level=0, **kwargs):
    """
    Get image data (WSI) as dask array.

    Args:
        dim_order: Dimension order of data
        level (int, optional): Image resolution level
        kwargs (optional): Format specific keyword arguments.
    """
    return self.get_data(dim_order, level=level, **kwargs)

get_data_as_generator(dim_order, **kwargs)

Get image data (WSI) as generator.

Parameters:

Name Type Description Default
dim_order

Dimension order of data

required
kwargs optional

Format specific keyword arguments.

{}
Source code in src/ImageSource.py
 95
 96
 97
 98
 99
100
101
102
103
def get_data_as_generator(self, dim_order, **kwargs):
    """
    Get image data (WSI) as generator.

    Args:
        dim_order: Dimension order of data
        kwargs (optional): Format specific keyword arguments.
    """
    return self.get_data(dim_order, **kwargs)

get_dim_order()

Get the dimension order string.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
135
136
137
138
139
140
141
142
def get_dim_order(self):
    """
    Get the dimension order string.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_dim_order' method must be implemented by subclasses.")

get_dtype()

Get the numpy dtype of the image data.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
144
145
146
147
148
149
150
151
def get_dtype(self):
    """
    Get the numpy dtype of the image data.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_dtype' method must be implemented by subclasses.")

get_fields()

Get the list of field indices.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
232
233
234
235
236
237
238
239
def get_fields(self):
    """
    Get the list of field indices.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_fields' method must be implemented by subclasses.")

get_image_window(window_scanner, well_id=None, field_id=None, data=None)

Get image value range window (for a well & field or from provided data).

Parameters:

Name Type Description Default
window_scanner WindowScanner

WindowScanner object to compute window.

required
well_id str

Well identifier

None
field_id int

Field identifier

None
data ndarray

Image data to compute window from.

None
Source code in src/ImageSource.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get_image_window(self, window_scanner, well_id=None, field_id=None, data=None):
    """
    Get image value range window (for a well & field or from provided data).

    Args:
        window_scanner (WindowScanner): WindowScanner object to compute window.
        well_id (str, optional): Well identifier
        field_id (int, optional): Field identifier
        data (ndarray, optional): Image data to compute window from.
    """
    # For RGB(A) uint8 images don't change color value range
    if self.get_dtype() != np.uint8:
        if data is None:
            for level, shape in enumerate(self.get_shapes()):
                if np.prod(shape) * self.get_dtype().itemsize < 1e8:  # less than 100 MB
                    data = self.get_data(self.get_dim_order(), well_id=well_id, field_id=field_id, level=level)
                    break
        if data is not None:
            window_scanner.process(data, self.get_dim_order())
    return window_scanner.get_window()

get_name()

Get the name of the image source.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
126
127
128
129
130
131
132
133
def get_name(self):
    """
    Get the name of the image source.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_name' method must be implemented by subclasses.")

get_nchannels()

Get the number of channels.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
181
182
183
184
185
186
187
188
def get_nchannels(self):
    """
    Get the number of channels.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_nchannels' method must be implemented by subclasses.")

get_pixel_size_um()

Get the pixel size in micrometers.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
153
154
155
156
157
158
159
160
def get_pixel_size_um(self):
    """
    Get the pixel size in micrometers.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_pixel_size_um' method must be implemented by subclasses.")

get_position_um(well_id=None)

Get the position in micrometers for a well.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
162
163
164
165
166
167
168
169
def get_position_um(self, well_id=None):
    """
    Get the position in micrometers for a well.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_position_um' method must be implemented by subclasses.")

get_rows()

Get the list of row identifiers.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
196
197
198
199
200
201
202
203
def get_rows(self):
    """
    Get the list of row identifiers.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_rows' method must be implemented by subclasses.")

get_scales()

Get the list of image scales.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
59
60
61
62
63
64
65
66
def get_scales(self):
    """
    Get the list of image scales.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_scales' method must be implemented by subclasses.")

get_shape()

Get the shape of the image data.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
41
42
43
44
45
46
47
48
def get_shape(self):
    """
    Get the shape of the image data.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_shape' method must be implemented by subclasses.")

get_shapes()

Get a list of shapes corresponding to the image data levels.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
50
51
52
53
54
55
56
57
def get_shapes(self):
    """
    Get a list of shapes corresponding to the image data levels.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_shapes' method must be implemented by subclasses.")

get_time_points()

Get the list of time points.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
223
224
225
226
227
228
229
230
def get_time_points(self):
    """
    Get the list of time points.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_time_points' method must be implemented by subclasses.")

get_total_data_size()

Get the estimated total data size.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
250
251
252
253
254
255
256
257
def get_total_data_size(self):
    """
    Get the estimated total data size.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_total_data_size' method must be implemented by subclasses.")

get_wells()

Get the list of well identifiers.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
214
215
216
217
218
219
220
221
def get_wells(self):
    """
    Get the list of well identifiers.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'get_wells' method must be implemented by subclasses.")

init_metadata()

Initialize and load metadata.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
23
24
25
26
27
28
29
30
def init_metadata(self):
    """
    Initialize and load metadata.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'init_metadata' method must be implemented by subclasses.")

is_rgb()

Check if the source is a RGB(A) image.

Source code in src/ImageSource.py
190
191
192
193
194
def is_rgb(self):
    """
    Check if the source is a RGB(A) image.
    """
    raise NotImplementedError("The 'is_rgb' method must be implemented by subclasses.")

is_screen()

Check if the source is a screen (multi-well).

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in src/ImageSource.py
32
33
34
35
36
37
38
39
def is_screen(self):
    """
    Check if the source is a screen (multi-well).

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError("The 'is_screen' method must be implemented by subclasses.")

print_well_matrix()

Print a matrix representation of the well plate.

Source code in src/ImageSource.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def print_well_matrix(self):
    """
    Print a matrix representation of the well plate.
    """
    s = ''

    rows, cols = self.get_rows(), self.get_columns()
    used_wells = [well for well in self.get_wells()]

    well_matrix = []
    for row_id in rows:
        row = ''
        for col_id in cols:
            well_id = f'{row_id}{col_id}'
            row += '+' if well_id in used_wells else ' '
        well_matrix.append(row)

    header = ' '.join([pad_leading_zero(col) for col in cols])
    s += ' ' + header + '\n'
    for idx, row in enumerate(well_matrix):
        s += f'{rows[idx]} ' + '  '.join(row) + '\n'
    return s

IncucyteSource

IncucyteSource

Bases: ImageSource

ImageSource implementation for Incucyte data

Handles the specific directory structure: EssenFiles/ScanData/YYMM/DD/HHMM/XXXX/*.tif

Filenames follow pattern: WELL-FIELD-CHANNEL.tif e.g., A1-1-C1.tif, B2-1-Ph.tif

Note: Multiple plates can exist in the same archive, identified by the XXXX folder. Use plate_id parameter to select a specific plate, or use get_available_plates() to discover all plates in the archive.

Source code in src/IncucyteSource.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
class IncucyteSource(ImageSource):
    """
    ImageSource implementation for Incucyte data

    Handles the specific directory structure:
    EssenFiles/ScanData/YYMM/DD/HHMM/XXXX/*.tif

    Filenames follow pattern: WELL-FIELD-CHANNEL.tif
    e.g., A1-1-C1.tif, B2-1-Ph.tif

    Note: Multiple plates can exist in the same archive, identified by the XXXX folder.
    Use plate_id parameter to select a specific plate, or use get_available_plates() 
    to discover all plates in the archive.
    """

    DIAG_ZIP_FILENAME = "Diag.zip"
    DIAG_LOG_FILENAME = "Diag.log"

    def __init__(self, uri, metadata={}, plate_id=None):
        """
        Initialize IncucyteSource.

        Args:
            uri (str): Path to the Incucyte archive folder
            metadata (dict): Optional metadata dictionary
            plate_id (str, optional): Specific plate ID to process (e.g., '700', '701').
                                     If None, will use the first available plate or all 
                                     if only one exists.
        """
        super().__init__(uri, metadata)
        self.plate_id = plate_id
        self.base_path = Path(self.uri)
        self.scan_data_path = self.base_path / "EssenFiles" / "ScanData"
        self._file_cache = {}
        self._file_caching = False
        # Default to True for filling missing images
        self.fill_missing_images = True

    @staticmethod
    def get_available_plates(uri):
        """
        Discover all available plate IDs in an Incucyte archive.

        Args:
            uri (str): Path to the Incucyte archive folder

        Returns:
            list: List of plate IDs (strings) found in the archive
        """
        base_path = Path(uri)
        scan_data_path = base_path / "EssenFiles" / "ScanData"

        if not scan_data_path.exists():
            raise ValueError(f"Scan data path not found: {scan_data_path}")

        plate_ids = set()

        # Navigate through the directory structure to find all plate IDs
        for year_month in scan_data_path.iterdir():
            if not year_month.is_dir():
                continue
            for day in year_month.iterdir():
                if not day.is_dir():
                    continue
                for time_dir in day.iterdir():
                    if not time_dir.is_dir():
                        continue
                    for plate_dir in time_dir.iterdir():
                        if plate_dir.is_dir():
                            plate_ids.add(plate_dir.name)

        return sorted(list(plate_ids))

    def enable_file_caching(self, file_caching=True):
        """
        Enable or disable file caching for image data.

        Args:
            file_caching (bool): If True, enable file caching; if False, disable it.
        """
        self._file_caching = file_caching
        if not file_caching:
            self._file_cache.clear()

    def _find_and_parse_diag_log(self):
        """
        Find the first Diag.zip in the scan data and parse it.

        Returns:
            dict: Parsed diag metadata or None if not found
        """
        # Look for first Diag.zip in the scan data
        diag_zip_files = list(self.scan_data_path.rglob(self.DIAG_ZIP_FILENAME))

        if diag_zip_files:
            results = self._parse_diag_log(diag_zip_files[0])
        else:
            results = None

        return results

    def _parse_diag_log(self, diag_zip_path):
        """
        Parse Diag.log from a Diag.zip file to extract imaging metadata.

        Args:
            diag_zip_path (Path): Path to Diag.zip file

        Returns:
            dict: Dictionary with 'pixel_sizes' (dict of mag->size),
                  'experiments' (dict of expid->metadata), or None if failed
        """
        try:
            with zipfile.ZipFile(diag_zip_path) as zip_ref:
                if self.DIAG_LOG_FILENAME not in zip_ref.namelist():
                    return None

                raw = zip_ref.read(self.DIAG_LOG_FILENAME)
                detection = chardet.detect(raw)
                content = raw.decode(detection['encoding'], errors='ignore')

                # Parse imaging specifications
                pixel_sizes = {}
                mag_pattern = r'(\d+)x:\s+.*?Image Resolution:\s+([\d.]+)\s+microns/pixel'
                for match in re.finditer(mag_pattern, content, re.DOTALL):
                    mag = match.group(1) + 'x'
                    pixel_size = float(match.group(2))
                    pixel_sizes[mag] = pixel_size

                # Parse experiment entries
                experiments = {}
                # Match ExpID and capture next 2 lines for Lmp info
                exp_pattern = r'ExpID=(\d+)[^\n]*Mag=(\d+x)[^\n]*(?:\n[^\n]*)?'
                for match in re.finditer(exp_pattern, content):
                    exp_id = match.group(1)
                    mag = match.group(2)

                    # Extract all exposure times from matched section
                    exp_section = match.group(0)
                    acq_times = re.findall(r'AcqTime=(\d+)', exp_section)

                    experiments[exp_id] = {
                        'magnification': mag,
                        'exposure_times_ms': [int(t) for t in acq_times] if acq_times else None,
                        'pixel_size_um': pixel_sizes.get(mag)
                    }

                results = {
                    'experiments': experiments,
                }

                nwell_raw = re.findall(r'(\d+)-well', content)
                if nwell_raw:
                    results['nwell_plate'] = int(Counter(nwell_raw).most_common(1)[0][0])

                return results
        except Exception as e:
            print(f"Warning: Could not parse {self.DIAG_LOG_FILENAME} from {diag_zip_path}: {e}")
            return None

    def init_metadata(self):
        """Initialize all metadata from Incucyte structure"""
        self._scan_timepoints()  # Must be first to set plate_id
        self._get_experiment_metadata()  # Uses plate_id in name
        self._get_sample_image_info()
        self._get_well_info()
        self._get_channel_info()
        self._get_image_info()

        # Initialize properties like TiffSource does
        self.name = self.metadata.get("Name", "Incucyte_Experiment")
        self.dim_order = self.metadata.get("dim_order", "tczyx")
        self.dtype = self.metadata.get("dtype", np.uint16)
        self.pixel_size = self._get_pixel_size_dict()
        self.channels = self._format_channels_for_interface()
        self.is_plate = len(self.metadata.get("wells", {})) > 0
        self.wells = list(self.metadata.get("wells", {}).keys())
        self.rows = self.metadata.get("well_info", {}).get("rows", [])
        self.columns = self.metadata.get("well_info", {}).get("columns", [])

        nt = len(self.metadata["time_points"])
        nc = self.metadata["num_channels"]
        nz = 1  # Incucyte is typically 2D
        self.shape = nt, nc, nz, self.height, self.width
        self.shapes = [(nt, nc, nz, height, width) for height, width in zip(self.heights, self.widths)]
        self.scales = [np.mean([width / self.width, height / self.height]) for width, height in zip(self.widths, self.heights)]

        return self.metadata

    def _get_experiment_metadata(self):
        """Extract experiment metadata from folder structure"""
        experiment_name = self.base_path.name

        # Add plate ID to name (plate_id is set by _scan_timepoints)
        if self.plate_id:
            experiment_name = f"{experiment_name}_plate{self.plate_id}"

        self.metadata.update(
            {
                "Name": experiment_name,
                "Creator": "Incucyte",
                "DateCreated": datetime.now(),
                "dim_order": "tczyx",
            }
        )

    def _scan_timepoints(self):
        """Scan the Incucyte directory structure for timepoints"""
        timepoints = []
        wells = set()
        fields = set()
        channels = set()
        found_plate_ids = set()

        print(f"Scanning directory: {self.scan_data_path}")

        if not self.scan_data_path.exists():
            raise ValueError(
                f"Scan data path not found: {self.scan_data_path}"
            )

        # Navigate through year/month directories (YYMM)
        for year_month in self.scan_data_path.iterdir():
            if not year_month.is_dir():
                continue
            # Navigate through day directories (DD)
            for day in year_month.iterdir():
                if not day.is_dir():
                    continue
                # Navigate through time directories (HHMM)
                for time_dir in day.iterdir():
                    if not time_dir.is_dir():
                        continue
                    # Navigate through plate ID directories (XXXX)
                    for plate_dir in time_dir.iterdir():
                        if not plate_dir.is_dir():
                            continue

                        current_plate_id = plate_dir.name
                        found_plate_ids.add(current_plate_id)

                        # Filter by plate_id if specified
                        if self.plate_id is not None:
                            if current_plate_id != self.plate_id:
                                continue

                        timepoint_path = plate_dir
                        timestamp = (
                            f"{year_month.name}_{day.name}_{time_dir.name}"
                        )

                        # Parse timestamp to datetime
                        try:
                            dt = datetime.strptime(timestamp, "%y%m_%d_%H%M")
                            if dt.year < 2000:
                                dt = dt.replace(year=dt.year + 2000)
                        except ValueError:
                            dt = None

                        timepoint_info = {
                            "path": timepoint_path,
                            "timestamp": timestamp,
                            "datetime": dt,
                            "index": len(timepoints),
                            "plate_id": current_plate_id,
                        }
                        timepoints.append(timepoint_info)

                        # Scan TIFF files in this timepoint
                        for tiff_file in timepoint_path.glob("*.tif"):
                            well, field, channel = self._parse_filename(tiff_file.name)
                            if well and field is not None and channel:
                                wells.add(well)
                                fields.add(field)
                                channels.add(channel)

        # Handle plate selection
        if self.plate_id is None:
            # Auto-select plate
            if len(found_plate_ids) == 0:
                raise ValueError("No plates found in the archive")
            elif len(found_plate_ids) == 1:
                # Single plate - use it automatically
                self.plate_id = list(found_plate_ids)[0]
            else:
                # Multiple plates - use first with warning
                plate_list = ", ".join(sorted(found_plate_ids))
                print(
                    f"Warning: Multiple plates found ({plate_list}). "
                    f"Using first plate: {sorted(found_plate_ids)[0]}"
                )
                print(
                    "To process a specific plate, use: "
                    "IncucyteSource(uri, plate_id='XXX')"
                )
                print(
                    "To process all plates, call get_available_plates() "
                    "and create separate sources"
                )
                self.plate_id = sorted(found_plate_ids)[0]

            # Filter timepoints to selected plate
            timepoints = [
                tp for tp in timepoints if tp["plate_id"] == self.plate_id
            ]
        else:
            # Validate specified plate_id
            if self.plate_id not in found_plate_ids:
                raise ValueError(
                    f"Plate ID '{self.plate_id}' not found. "
                    f"Available plates: {', '.join(sorted(found_plate_ids))}"
                )
            # Filter timepoints to specified plate
            timepoints = [
                tp for tp in timepoints if tp["plate_id"] == self.plate_id
            ]

        # Store found plate IDs in metadata
        self.metadata["available_plates"] = sorted(found_plate_ids)
        self.metadata["selected_plate"] = self.plate_id

        # Sort timepoints by datetime if available, otherwise by timestamp
        timepoints.sort(
            key=lambda x: x["datetime"] if x["datetime"] else x["timestamp"]
        )

        # Update indices after sorting
        for i, tp in enumerate(timepoints):
            tp["index"] = i

        self.metadata.update(
            {
                "timepoints": timepoints,
                "time_points": [tp["index"] for tp in timepoints],
                "wells_raw": sorted(wells),
                "fields_raw": sorted(fields),
                "channels_raw": sorted(channels),
            }
        )

        plate_info = (
            f" (plate: {self.plate_id})" if self.plate_id else ""
        )
        print(
            f"Found{plate_info}: {len(timepoints)} timepoints, "
            f"{len(wells)} wells, {len(fields)} fields, "
            f"{len(channels)} channels"
        )

    def _parse_filename(self, filename):
        """
        Parse Incucyte filename format: WELL-FIELD-CHANNEL.tif
        Examples: A1-1-C1.tif, B2-1-Ph.tif
        Returns: (well, field, channel)
        """
        pattern = r"([A-Z]\d+)-(\d+)-(.+)\.tif"
        match = re.match(pattern, filename)
        if match:
            well = match.group(1)
            field = int(match.group(2)) - 1  # Convert to 0-based indexing
            channel = match.group(3)
            return well, field, channel
        return None, None, None

    def _get_well_info(self):
        """Process well information and determine plate layout"""
        wells_raw = self.metadata["wells_raw"]

        if not wells_raw:
            raise ValueError("No wells found in data")

        # Parse well positions
        rows = set()
        cols = set()
        wells_dict = {}

        for well_index, well_name in enumerate(wells_raw):
            row, col = split_well_name(well_name, col_as_int=True)

            rows.add(row)
            cols.add(col)

            wells_dict[well_name] = {
                "Name": well_name,
                "row": ord(row) - ord("A"),
                "column": col - 1,
                "ZoneIndex": well_index,
            }

        nwell_plate = self.sample_image_info.get("nwell_plate")
        if nwell_plate:
            rows, cols = get_rows_cols_plate(nwell_plate)
        else:
            rows = sorted(rows)
            cols = [str(col) for col in sorted(cols)]

        # Get image dimensions from first available image
        sample_image_info = self.sample_image_info

        well_info = {
            "rows": rows,
            "columns": cols,
            "SensorSizeXPixels": sample_image_info["width"],
            "SensorSizeYPixels": sample_image_info["height"],
            "SitesX": 1,
            "SitesY": 1,
            "num_sites": len(self.metadata["fields_raw"]),
            "fields": [str(f) for f in self.metadata["fields_raw"]],
            "PixelSizeUm": sample_image_info["pixel_x"],
            "SensorBitness": sample_image_info["bits"],
            "max_sizex_um": sample_image_info["width"] * sample_image_info["pixel_x"],
            "max_sizey_um": sample_image_info["height"] * sample_image_info["pixel_y"],
        }

        # Add optional imaging metadata if available
        if "magnification" in sample_image_info:
            well_info["Magnification"] = sample_image_info["magnification"]
        if "exposure_times_ms" in sample_image_info:
            well_info["ExposureTimes_ms"] = sample_image_info["exposure_times_ms"]

        self.metadata.update({"wells": wells_dict, "well_info": well_info})

    def _get_sample_image_info(self):
        """Get image dimensions and bit depth from first available TIFF.
        Attempts to get accurate pixel size from Diag.log if available."""

        pixel_size_from_diag = None
        magnification = None
        exposure_time = None
        nwell_plate = None

        if self.plate_id:
            # Try to get calibrated pixel size from Diag.log
            diag_metadata = self._find_and_parse_diag_log()
            if diag_metadata and 'experiments' in diag_metadata:
                exp_info = diag_metadata['experiments'].get(self.plate_id)
                if exp_info:
                    pixel_size_from_diag = exp_info.get('pixel_size_um')
                    magnification = exp_info.get('magnification')
                    exposure_times = exp_info.get('exposure_times_ms')
                    # Use the exposure times list if available
                    exposure_time = exposure_times
                    if pixel_size_from_diag:
                        print(f"Found calibrated pixel size from {self.DIAG_LOG_FILENAME}: "
                              f"{pixel_size_from_diag} µm/pixel "
                              f"(Magnification: {magnification})")
                nwell_plate = diag_metadata.get('nwell_plate')

        for timepoint in self.metadata["timepoints"]:
            for tiff_file in timepoint["path"].glob("*.tif"):
                try:
                    # Get actual image dimensions from the file
                    with tifffile.TiffFile(str(tiff_file)) as tif:
                        page = tif.pages.first
                        width = page.sizes["width"]
                        height = page.sizes["height"]
                        dtype = page.dtype
                        bits = dtype.itemsize * 8
                        if tif.series:
                            series_page = tif.series[0]
                            if hasattr(series_page, 'levels'):
                                level_pages = series_page.levels
                                widths = [level_page.sizes["width"] for level_page in level_pages]
                                heights = [level_page.sizes["height"] for level_page in level_pages]

                    # Use calibrated pixel size from Diag.log if available
                    if pixel_size_from_diag:
                        pixel_x = pixel_size_from_diag
                        pixel_y = pixel_size_from_diag
                    else:
                        # Fallback to TIFF metadata
                        temp_tiff_source = TiffSource(str(tiff_file))
                        temp_tiff_source.init_metadata()
                        pixel_size = temp_tiff_source.get_pixel_size_um()
                        temp_tiff_source.close()
                        pixel_x = pixel_size.get("x")
                        pixel_y = pixel_size.get("y")

                    self.sample_image_info = {
                        "width": width,
                        "height": height,
                        "bits": bits,
                        "dtype": dtype,
                        "pixel_x": pixel_x,
                        "pixel_y": pixel_y,
                    }
                    self.width = width
                    self.height = height
                    self.widths = widths
                    self.heights = heights

                    # Add optional metadata if available
                    if magnification:
                        self.sample_image_info["magnification"] = magnification
                    if exposure_time:
                        self.sample_image_info["exposure_times_ms"] = exposure_time
                    if nwell_plate:
                        self.sample_image_info["nwell_plate"] = nwell_plate

                    return

                except Exception as e:
                    print(f"Could not read sample image {tiff_file}: {e}")
                    continue

        # If no valid TIFF files found
        raise ValueError(
            f"No valid TIFF files found in experiment directory: "
            f"{self.scan_data_path}"
        )

    def _get_channel_info(self):
        """Process channel information"""
        channels_raw = self.metadata["channels_raw"]
        channels = []

        channel_mapping = {
            "C1": {"label": "Green", "color": "00FF00"},
            "C2": {"label": "Red", "color": "FF0000"},
            "Ph": {"label": "Phase_Contrast", "color": "FFFFFF"},
            "P": {"label": "Phase_Contrast", "color": "FFFFFF"},
        }

        for i, channel_code in enumerate(channels_raw):
            channel_info = channel_mapping.get(
                channel_code, {"label": channel_code, "color": "FFFFFF"}
            )

            channels.append(
                {
                    "ChannelNumber": i,
                    "Dye": channel_info["label"],
                    "Color": f"#{channel_info['color']}",
                    "Emission": None,
                    "Excitation": None,
                    "code": channel_code,
                }
            )

        self.metadata.update({"channels": channels, "num_channels": len(channels)})

    def _get_image_info(self):
        """Get image-related metadata"""
        well_info = self.metadata["well_info"]
        max_data_size = (
            well_info["SensorSizeXPixels"]
            * well_info["SensorSizeYPixels"]
            * len(self.metadata["wells"])
            * well_info["num_sites"]
            * self.metadata["num_channels"]
            * len(self.metadata["time_points"])
            * (self.sample_image_info["bits"] // 8)
        )

        self.metadata.update(
            {
                "bits_per_pixel": self.sample_image_info["bits"],
                "dtype": self.sample_image_info["dtype"],
                "max_data_size": max_data_size,
            }
        )

    def _get_pixel_size_dict(self):
        """Get pixel size in TiffSource format"""
        well_info = self.metadata.get("well_info", {})
        pixel_size = well_info.get("PixelSizeUm", 1.0)
        return {"x": pixel_size, "y": pixel_size}

    def _format_channels_for_interface(self):
        """Format channels for interface compatibility"""
        channels = self.metadata.get("channels", [])
        return [
            {"label": channel["Dye"], "color": hexrgb_to_rgba(channel["Color"].lstrip("#"))} for channel in channels
        ]

    def _load_image_data(self, well_id, field_id, channel_id, timepoint_id, level=0):
        """Load specific image data"""
        cache_key = (well_id, field_id, channel_id, timepoint_id, level)
        if cache_key in self._file_cache:
            return self._file_cache[cache_key]

        data = None

        # Find the file for this combination
        timepoint_info = self.metadata["timepoints"][timepoint_id]
        channel_code = self.metadata["channels_raw"][channel_id]

        filename = f"{well_id}-{field_id + 1}-{channel_code}.tif"
        file_path = timepoint_info["path"] / filename

        message = ""
        # Check if file exists
        if not file_path.exists():
            if self.fill_missing_images:
                message = f"Warning: Missing image file {file_path}, filled with black image"
            else:
                raise FileNotFoundError(f"Image file not found: {file_path}")

        try:
            # Let TiffFile handle the file reading errors naturally
            with tifffile.TiffFile(str(file_path)) as tif:
                data = tif.asarray(level=level)
        except Exception as e:
            if self.fill_missing_images:
                message = f"Warning: Could not read image file {file_path}: {e}, filled with black image"
            else:
                raise e

        if data is None and self.fill_missing_images:
            # Create a black image with the same dimensions as other images
            data = np.zeros((self.sample_image_info["height"], self.sample_image_info["width"]),
                            dtype=self.sample_image_info["dtype"])
            print(message)

        if self._file_caching:
            self._file_cache[cache_key] = data
        return data

    # ImageSource interface methods
    def is_screen(self):
        return self.is_plate

    def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
        """Get data for a specific well and field"""
        well_id = strip_leading_zeros(well_id)

        if well_id not in self.metadata["wells"]:
            raise ValueError(
                f"Invalid Well: {well_id}. Available: {list(self.metadata['wells'].keys())}"
            )

        field_id = int(field_id)
        if field_id not in self.metadata["fields_raw"]:
            raise ValueError(
                f"Invalid Field: {field_id}. Available: {self.metadata['fields_raw']}"
            )

        # Build 5D array: (t, c, z, y, x)
        nt = len(self.metadata["time_points"])
        nc = self.metadata["num_channels"]

        data = np.zeros(self.shape, dtype=self.sample_image_info["dtype"])

        for t in range(nt):
            for c in range(nc):
                image_data = self._load_image_data(well_id, field_id, c, t, level=level)
                # Handle different image shapes
                if image_data.ndim > 2:
                    # assume data [z, y, x] - TODO: for 3D support _load_image_data() needs to handle z properly
                    data[t, c, :, :, :] = image_data
                else:
                    data[t, c, 0, :, :] = image_data

        return redimension_data(data, self.dim_order, dim_order)

    def get_shape(self):
        return self.shape

    def get_scales(self):
        return self.scales

    def get_name(self):
        return self.name

    def get_dim_order(self):
        return self.dim_order

    def get_dtype(self):
        return self.dtype

    def get_pixel_size_um(self):
        return self.pixel_size

    def get_position_um(self, well_id=None):
        well = self.metadata["wells"].get(well_id, {})
        well_info = self.metadata["well_info"]
        x = well.get("CoordX", 0) * well_info.get("max_sizex_um", 0)
        y = well.get("CoordY", 0) * well_info.get("max_sizey_um", 0)
        return {"x": x, "y": y}

    def get_channels(self):
        return self.channels

    def get_nchannels(self):
        return max(self.metadata.get("num_channels", 1), 1)

    def get_rows(self):
        return self.rows

    def get_columns(self):
        return self.columns

    def get_wells(self):
        return self.wells

    def get_time_points(self):
        return self.metadata.get("time_points", [])

    def get_fields(self):
        return self.metadata.get("well_info", {}).get("fields", [])

    def get_well_coords_um(self, well_id):
        """Get well coordinates (placeholder - Incucyte doesn't typically have stage coordinates)"""
        return {"x": 0.0, "y": 0.0}

    def get_acquisitions(self):
        """Return acquisition information based on timepoints"""
        acquisitions = []
        for i, tp in enumerate(self.metadata.get("timepoints", [])):
            acq = {
                "id": i,
                "name": f"Timepoint_{tp['timestamp']}",
                "description": f"Incucyte acquisition at {tp['timestamp']}",
                "date_created": tp["datetime"].isoformat()
                if tp["datetime"]
                else tp["timestamp"],
                "date_modified": tp["datetime"].isoformat()
                if tp["datetime"]
                else tp["timestamp"],
            }
            acquisitions.append(acq)
        return acquisitions

    def get_total_data_size(self):
        return self.metadata.get("max_data_size", 0)

    def print_well_matrix(self):
        """Print a visual representation of the plate layout"""
        s = ""
        well_info = self.metadata.get("well_info", {})
        rows = well_info.get("rows", [])
        cols = [int(c) for c in well_info.get("columns", [])]
        used_wells = set(self.metadata.get("wells", {}).keys())

        # Header with column numbers
        header = "   " + "  ".join(f"{col:2d}" for col in cols)
        s += header + "\n"

        # Each row
        for row_letter in rows:
            row_line = f"{row_letter}  "
            for col_num in cols:
                well_id = f"{row_letter}{col_num}"
                row_line += " + " if well_id in used_wells else "   "
            s += row_line + "\n"

        return s

    def print_timepoint_well_matrix(self):
        """Print timepoint vs well matrix"""
        s = ""
        timepoints = self.metadata.get("timepoints", [])
        wells = list(self.metadata.get("wells", {}).keys())

        # Header
        header = "Timepoint   " + "  ".join(f"{well:>3}" for well in wells)
        s += header + "\n"

        # Check which wells have data at each timepoint
        for tp in timepoints:
            line = f"{tp['timestamp']:>9}   "
            for well in wells:
                # Check if any files exist for this well at this timepoint
                has_data = any(
                    (tp["path"] / f"{well}-{field + 1}-{channel}.tif").exists()
                    for field in self.metadata.get("fields_raw", [])
                    for channel in self.metadata.get("channels_raw", [])
                )
                line += " + " if has_data else "   "
            s += line + "\n"

        return s

    def is_rgb(self):
        """
        Check if the source is a RGB(A) image.
        Incucyte data stores channels separately, not as RGB.
        """
        return False

    def close(self):
        """Clean up resources"""
        self._file_cache.clear()

DIAG_LOG_FILENAME = 'Diag.log' class-attribute instance-attribute

DIAG_ZIP_FILENAME = 'Diag.zip' class-attribute instance-attribute

base_path = Path(self.uri) instance-attribute

fill_missing_images = True instance-attribute

plate_id = plate_id instance-attribute

scan_data_path = self.base_path / 'EssenFiles' / 'ScanData' instance-attribute

__init__(uri, metadata={}, plate_id=None)

Initialize IncucyteSource.

Parameters:

Name Type Description Default
uri str

Path to the Incucyte archive folder

required
metadata dict

Optional metadata dictionary

{}
plate_id str

Specific plate ID to process (e.g., '700', '701'). If None, will use the first available plate or all if only one exists.

None
Source code in src/IncucyteSource.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(self, uri, metadata={}, plate_id=None):
    """
    Initialize IncucyteSource.

    Args:
        uri (str): Path to the Incucyte archive folder
        metadata (dict): Optional metadata dictionary
        plate_id (str, optional): Specific plate ID to process (e.g., '700', '701').
                                 If None, will use the first available plate or all 
                                 if only one exists.
    """
    super().__init__(uri, metadata)
    self.plate_id = plate_id
    self.base_path = Path(self.uri)
    self.scan_data_path = self.base_path / "EssenFiles" / "ScanData"
    self._file_cache = {}
    self._file_caching = False
    # Default to True for filling missing images
    self.fill_missing_images = True

close()

Clean up resources

Source code in src/IncucyteSource.py
796
797
798
def close(self):
    """Clean up resources"""
    self._file_cache.clear()

enable_file_caching(file_caching=True)

Enable or disable file caching for image data.

Parameters:

Name Type Description Default
file_caching bool

If True, enable file caching; if False, disable it.

True
Source code in src/IncucyteSource.py
89
90
91
92
93
94
95
96
97
98
def enable_file_caching(self, file_caching=True):
    """
    Enable or disable file caching for image data.

    Args:
        file_caching (bool): If True, enable file caching; if False, disable it.
    """
    self._file_caching = file_caching
    if not file_caching:
        self._file_cache.clear()

get_acquisitions()

Return acquisition information based on timepoints

Source code in src/IncucyteSource.py
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
def get_acquisitions(self):
    """Return acquisition information based on timepoints"""
    acquisitions = []
    for i, tp in enumerate(self.metadata.get("timepoints", [])):
        acq = {
            "id": i,
            "name": f"Timepoint_{tp['timestamp']}",
            "description": f"Incucyte acquisition at {tp['timestamp']}",
            "date_created": tp["datetime"].isoformat()
            if tp["datetime"]
            else tp["timestamp"],
            "date_modified": tp["datetime"].isoformat()
            if tp["datetime"]
            else tp["timestamp"],
        }
        acquisitions.append(acq)
    return acquisitions

get_available_plates(uri) staticmethod

Discover all available plate IDs in an Incucyte archive.

Parameters:

Name Type Description Default
uri str

Path to the Incucyte archive folder

required

Returns:

Name Type Description
list

List of plate IDs (strings) found in the archive

Source code in src/IncucyteSource.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@staticmethod
def get_available_plates(uri):
    """
    Discover all available plate IDs in an Incucyte archive.

    Args:
        uri (str): Path to the Incucyte archive folder

    Returns:
        list: List of plate IDs (strings) found in the archive
    """
    base_path = Path(uri)
    scan_data_path = base_path / "EssenFiles" / "ScanData"

    if not scan_data_path.exists():
        raise ValueError(f"Scan data path not found: {scan_data_path}")

    plate_ids = set()

    # Navigate through the directory structure to find all plate IDs
    for year_month in scan_data_path.iterdir():
        if not year_month.is_dir():
            continue
        for day in year_month.iterdir():
            if not day.is_dir():
                continue
            for time_dir in day.iterdir():
                if not time_dir.is_dir():
                    continue
                for plate_dir in time_dir.iterdir():
                    if plate_dir.is_dir():
                        plate_ids.add(plate_dir.name)

    return sorted(list(plate_ids))

get_channels()

Source code in src/IncucyteSource.py
696
697
def get_channels(self):
    return self.channels

get_columns()

Source code in src/IncucyteSource.py
705
706
def get_columns(self):
    return self.columns

get_data(dim_order, level=0, well_id=None, field_id=None, **kwargs)

Get data for a specific well and field

Source code in src/IncucyteSource.py
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
    """Get data for a specific well and field"""
    well_id = strip_leading_zeros(well_id)

    if well_id not in self.metadata["wells"]:
        raise ValueError(
            f"Invalid Well: {well_id}. Available: {list(self.metadata['wells'].keys())}"
        )

    field_id = int(field_id)
    if field_id not in self.metadata["fields_raw"]:
        raise ValueError(
            f"Invalid Field: {field_id}. Available: {self.metadata['fields_raw']}"
        )

    # Build 5D array: (t, c, z, y, x)
    nt = len(self.metadata["time_points"])
    nc = self.metadata["num_channels"]

    data = np.zeros(self.shape, dtype=self.sample_image_info["dtype"])

    for t in range(nt):
        for c in range(nc):
            image_data = self._load_image_data(well_id, field_id, c, t, level=level)
            # Handle different image shapes
            if image_data.ndim > 2:
                # assume data [z, y, x] - TODO: for 3D support _load_image_data() needs to handle z properly
                data[t, c, :, :, :] = image_data
            else:
                data[t, c, 0, :, :] = image_data

    return redimension_data(data, self.dim_order, dim_order)

get_dim_order()

Source code in src/IncucyteSource.py
680
681
def get_dim_order(self):
    return self.dim_order

get_dtype()

Source code in src/IncucyteSource.py
683
684
def get_dtype(self):
    return self.dtype

get_fields()

Source code in src/IncucyteSource.py
714
715
def get_fields(self):
    return self.metadata.get("well_info", {}).get("fields", [])

get_name()

Source code in src/IncucyteSource.py
677
678
def get_name(self):
    return self.name

get_nchannels()

Source code in src/IncucyteSource.py
699
700
def get_nchannels(self):
    return max(self.metadata.get("num_channels", 1), 1)

get_pixel_size_um()

Source code in src/IncucyteSource.py
686
687
def get_pixel_size_um(self):
    return self.pixel_size

get_position_um(well_id=None)

Source code in src/IncucyteSource.py
689
690
691
692
693
694
def get_position_um(self, well_id=None):
    well = self.metadata["wells"].get(well_id, {})
    well_info = self.metadata["well_info"]
    x = well.get("CoordX", 0) * well_info.get("max_sizex_um", 0)
    y = well.get("CoordY", 0) * well_info.get("max_sizey_um", 0)
    return {"x": x, "y": y}

get_rows()

Source code in src/IncucyteSource.py
702
703
def get_rows(self):
    return self.rows

get_scales()

Source code in src/IncucyteSource.py
674
675
def get_scales(self):
    return self.scales

get_shape()

Source code in src/IncucyteSource.py
671
672
def get_shape(self):
    return self.shape

get_time_points()

Source code in src/IncucyteSource.py
711
712
def get_time_points(self):
    return self.metadata.get("time_points", [])

get_total_data_size()

Source code in src/IncucyteSource.py
739
740
def get_total_data_size(self):
    return self.metadata.get("max_data_size", 0)

get_well_coords_um(well_id)

Get well coordinates (placeholder - Incucyte doesn't typically have stage coordinates)

Source code in src/IncucyteSource.py
717
718
719
def get_well_coords_um(self, well_id):
    """Get well coordinates (placeholder - Incucyte doesn't typically have stage coordinates)"""
    return {"x": 0.0, "y": 0.0}

get_wells()

Source code in src/IncucyteSource.py
708
709
def get_wells(self):
    return self.wells

init_metadata()

Initialize all metadata from Incucyte structure

Source code in src/IncucyteSource.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def init_metadata(self):
    """Initialize all metadata from Incucyte structure"""
    self._scan_timepoints()  # Must be first to set plate_id
    self._get_experiment_metadata()  # Uses plate_id in name
    self._get_sample_image_info()
    self._get_well_info()
    self._get_channel_info()
    self._get_image_info()

    # Initialize properties like TiffSource does
    self.name = self.metadata.get("Name", "Incucyte_Experiment")
    self.dim_order = self.metadata.get("dim_order", "tczyx")
    self.dtype = self.metadata.get("dtype", np.uint16)
    self.pixel_size = self._get_pixel_size_dict()
    self.channels = self._format_channels_for_interface()
    self.is_plate = len(self.metadata.get("wells", {})) > 0
    self.wells = list(self.metadata.get("wells", {}).keys())
    self.rows = self.metadata.get("well_info", {}).get("rows", [])
    self.columns = self.metadata.get("well_info", {}).get("columns", [])

    nt = len(self.metadata["time_points"])
    nc = self.metadata["num_channels"]
    nz = 1  # Incucyte is typically 2D
    self.shape = nt, nc, nz, self.height, self.width
    self.shapes = [(nt, nc, nz, height, width) for height, width in zip(self.heights, self.widths)]
    self.scales = [np.mean([width / self.width, height / self.height]) for width, height in zip(self.widths, self.heights)]

    return self.metadata

is_rgb()

Check if the source is a RGB(A) image. Incucyte data stores channels separately, not as RGB.

Source code in src/IncucyteSource.py
789
790
791
792
793
794
def is_rgb(self):
    """
    Check if the source is a RGB(A) image.
    Incucyte data stores channels separately, not as RGB.
    """
    return False

is_screen()

Source code in src/IncucyteSource.py
635
636
def is_screen(self):
    return self.is_plate

print_timepoint_well_matrix()

Print timepoint vs well matrix

Source code in src/IncucyteSource.py
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
def print_timepoint_well_matrix(self):
    """Print timepoint vs well matrix"""
    s = ""
    timepoints = self.metadata.get("timepoints", [])
    wells = list(self.metadata.get("wells", {}).keys())

    # Header
    header = "Timepoint   " + "  ".join(f"{well:>3}" for well in wells)
    s += header + "\n"

    # Check which wells have data at each timepoint
    for tp in timepoints:
        line = f"{tp['timestamp']:>9}   "
        for well in wells:
            # Check if any files exist for this well at this timepoint
            has_data = any(
                (tp["path"] / f"{well}-{field + 1}-{channel}.tif").exists()
                for field in self.metadata.get("fields_raw", [])
                for channel in self.metadata.get("channels_raw", [])
            )
            line += " + " if has_data else "   "
        s += line + "\n"

    return s

print_well_matrix()

Print a visual representation of the plate layout

Source code in src/IncucyteSource.py
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
def print_well_matrix(self):
    """Print a visual representation of the plate layout"""
    s = ""
    well_info = self.metadata.get("well_info", {})
    rows = well_info.get("rows", [])
    cols = [int(c) for c in well_info.get("columns", [])]
    used_wells = set(self.metadata.get("wells", {}).keys())

    # Header with column numbers
    header = "   " + "  ".join(f"{col:2d}" for col in cols)
    s += header + "\n"

    # Each row
    for row_letter in rows:
        row_line = f"{row_letter}  "
        for col_num in cols:
            well_id = f"{row_letter}{col_num}"
            row_line += " + " if well_id in used_wells else "   "
        s += row_line + "\n"

    return s

MiraxSource

PYRAMID_DOWNSCALE = 2 module-attribute

PYRAMID_LEVELS = 6 module-attribute

RETRY_ATTEMPTS = 3 module-attribute

TIFF_COMPRESSION = 'LZW' module-attribute

TILE_SIZE = 1024 module-attribute

VERSION = 'v0.1.10' module-attribute

ZARR_CHUNK_SIZE = TILE_SIZE module-attribute

ZARR_SHARD_MULTIPLIER = 10 module-attribute

MiraxSource

Bases: ImageSource

ImageSource subclass for reading Mirax files using OpenSlide.

Source code in src/MiraxSource.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class MiraxSource(ImageSource):
    """
    ImageSource subclass for reading Mirax files using OpenSlide.
    """

    def __init__(self, uri, metadata={}):
        super().__init__(uri, metadata)
        self.slide = openslide.open_slide(uri)

    def init_metadata(self):
        self.metadata = {key.lower(): value for key, value in dict(self.slide.properties).items()}

        self.dimensions = self.slide.level_dimensions
        self.widths = [width for width, height in self.slide.level_dimensions]
        self.heights = [height for width, height in self.slide.level_dimensions]
        self.level_downsamples = self.slide.level_downsamples
        self.scales = [1 / downsample for downsample in self.level_downsamples]
        self.nchannels = 3      # Mirax is RGBA; convert to RGB
        self.shapes = [(height, width, self.nchannels) for (width, height) in self.dimensions]
        self.shape = self.shapes[0]
        self.dim_order = 'yxc'
        self.is_rgb_channels = True
        nbits = 8
        for key, value in self.metadata.items():
            if 'slide_name' in key:
                self.name = value
            if 'slide_bitdepth' in key:
                nbits = int(value)
        self.dtype = np.dtype(f'uint{nbits}')

        # OpenSlide stores microns per pixel in properties
        mpp_x = float(self.metadata.get(openslide.PROPERTY_NAME_MPP_X, 1))
        mpp_y = float(self.metadata.get(openslide.PROPERTY_NAME_MPP_Y, 1))
        self.pixel_size = {'x': mpp_x, 'y': mpp_y}
        background_float = hexrgb_to_rgba(self.metadata.get(openslide.PROPERTY_NAME_BACKGROUND_COLOR, '000000'))[:3]
        self.background = [np.uint8(value * 255) for value in background_float]

        self.name = get_filetitle(self.uri)
        return self.metadata

    def is_screen(self):
        # Mirax files are not multi-well screens
        return False

    def get_shape(self):
        return self.shape

    def get_shapes(self):
        return self.shapes

    def get_scales(self):
        return self.scales

    # TODO: check (x/y) source data is read in order first to last (currently last to first) using dask, or use generator/stream to dask?
    # read_tile_array(50000, 180000, 1000, 1000, 0)

    def read_array(self, x, y, width, height, level=0):
        # OpenSlide uses (x, y) coordinates in level 0 reference size
        x0 = int(x * self.level_downsamples[level])
        y0 = int(y * self.level_downsamples[level])
        #return np.array(self.slide.read_region((x0, y0), level, (width, height)).convert('RGB'))   # discard alpha
        rgba = np.array(self.slide.read_region((x0, y0), level, (width, height)))
        alpha = np.atleast_3d(rgba[..., 3] / np.float32(255))
        rgb = (rgba[..., :3] * alpha + self.background * (1 - alpha)).astype(np.uint8)
        return rgb

    def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
        data = self.read_array(0, 0, self.widths[level], self.heights[level], level=level)
        return redimension_data(data, self.dim_order, dim_order)

    def get_data_as_dask(self, dim_order, level=0, **kwargs):
        dask.config.set(scheduler='single-threaded')

        def get_lazy_tile(x, y, width, height, level=0):
            lazy_array = dask.delayed(self.read_array)(x, y, width, height, level)
            return da.from_delayed(lazy_array, shape=(height, width, self.nchannels), dtype=self.dtype)

        y_chunks, x_chunks = da.core.normalize_chunks(TILE_SIZE, self.shapes[level][:2], dtype=self.dtype)
        y_pos = np.cumsum([0] + list(y_chunks)[:-1])
        x_pos = np.cumsum([0] + list(x_chunks)[:-1])
        data = da.concatenate(
            [da.concatenate(
                [get_lazy_tile(x, y, width, height, level=level)
                 for x, width in zip(x_pos, x_chunks)], axis=1)
             for y, height in zip(y_pos, y_chunks)], axis=0)
        return redimension_data(data, self.dim_order, dim_order)

    def get_data_as_generator(self, dim_order, **kwargs):
        def data_generator(scale=1):
            level, rescale = get_level_from_scale(self.scales, scale)
            read_size = int(TILE_SIZE / rescale)
            for y in range(0, self.heights[level], read_size):
                for x in range(0, self.widths[level], read_size):
                    data = self.read_array(x, y, read_size, read_size, level)
                    if rescale != 1:
                        shape = np.multiply(data.shape[:2], rescale).astype(int)
                        data = sk_transform.resize(data, shape, preserve_range=True).astype(data.dtype)
                    yield redimension_data(data, self.dim_order, dim_order)
        return data_generator

    def get_name(self):
        return self.name

    def get_dim_order(self):
        return self.dim_order

    def get_dtype(self):
        return self.dtype

    def get_pixel_size_um(self):
        return self.pixel_size

    def get_position_um(self, well_id=None):
        # Not applicable for Mirax
        return {'x': 0, 'y': 0}

    def get_channels(self):
        # Mirax is RGB, return NGFF-style channel metadata
        return [
            {"name": "Red", "color": [1, 0, 0, 1]},
            {"name": "Green", "color": [0, 1, 0, 1]},
            {"name": "Blue", "color": [0, 0, 1, 1]},
            #{"name": "Alpha", "color": [1, 1, 1, 1]}
        ]

    def get_nchannels(self):
        return self.nchannels

    def is_rgb(self):
        return True

    def get_rows(self):
        return []

    def get_columns(self):
        return []

    def get_wells(self):
        return []

    def get_time_points(self):
        return []

    def get_fields(self):
        return []

    def get_acquisitions(self):
        return []

    def get_total_data_size(self):
        return np.prod(self.shape) * np.dtype(self.get_dtype()).itemsize

    def close(self):
        self.slide.close()

slide = openslide.open_slide(uri) instance-attribute

__init__(uri, metadata={})

Source code in src/MiraxSource.py
20
21
22
def __init__(self, uri, metadata={}):
    super().__init__(uri, metadata)
    self.slide = openslide.open_slide(uri)

close()

Source code in src/MiraxSource.py
167
168
def close(self):
    self.slide.close()

get_acquisitions()

Source code in src/MiraxSource.py
161
162
def get_acquisitions(self):
    return []

get_channels()

Source code in src/MiraxSource.py
131
132
133
134
135
136
137
138
def get_channels(self):
    # Mirax is RGB, return NGFF-style channel metadata
    return [
        {"name": "Red", "color": [1, 0, 0, 1]},
        {"name": "Green", "color": [0, 1, 0, 1]},
        {"name": "Blue", "color": [0, 0, 1, 1]},
        #{"name": "Alpha", "color": [1, 1, 1, 1]}
    ]

get_columns()

Source code in src/MiraxSource.py
149
150
def get_columns(self):
    return []

get_data(dim_order, level=0, well_id=None, field_id=None, **kwargs)

Source code in src/MiraxSource.py
81
82
83
def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
    data = self.read_array(0, 0, self.widths[level], self.heights[level], level=level)
    return redimension_data(data, self.dim_order, dim_order)

get_data_as_dask(dim_order, level=0, **kwargs)

Source code in src/MiraxSource.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_data_as_dask(self, dim_order, level=0, **kwargs):
    dask.config.set(scheduler='single-threaded')

    def get_lazy_tile(x, y, width, height, level=0):
        lazy_array = dask.delayed(self.read_array)(x, y, width, height, level)
        return da.from_delayed(lazy_array, shape=(height, width, self.nchannels), dtype=self.dtype)

    y_chunks, x_chunks = da.core.normalize_chunks(TILE_SIZE, self.shapes[level][:2], dtype=self.dtype)
    y_pos = np.cumsum([0] + list(y_chunks)[:-1])
    x_pos = np.cumsum([0] + list(x_chunks)[:-1])
    data = da.concatenate(
        [da.concatenate(
            [get_lazy_tile(x, y, width, height, level=level)
             for x, width in zip(x_pos, x_chunks)], axis=1)
         for y, height in zip(y_pos, y_chunks)], axis=0)
    return redimension_data(data, self.dim_order, dim_order)

get_data_as_generator(dim_order, **kwargs)

Source code in src/MiraxSource.py
102
103
104
105
106
107
108
109
110
111
112
113
def get_data_as_generator(self, dim_order, **kwargs):
    def data_generator(scale=1):
        level, rescale = get_level_from_scale(self.scales, scale)
        read_size = int(TILE_SIZE / rescale)
        for y in range(0, self.heights[level], read_size):
            for x in range(0, self.widths[level], read_size):
                data = self.read_array(x, y, read_size, read_size, level)
                if rescale != 1:
                    shape = np.multiply(data.shape[:2], rescale).astype(int)
                    data = sk_transform.resize(data, shape, preserve_range=True).astype(data.dtype)
                yield redimension_data(data, self.dim_order, dim_order)
    return data_generator

get_dim_order()

Source code in src/MiraxSource.py
118
119
def get_dim_order(self):
    return self.dim_order

get_dtype()

Source code in src/MiraxSource.py
121
122
def get_dtype(self):
    return self.dtype

get_fields()

Source code in src/MiraxSource.py
158
159
def get_fields(self):
    return []

get_name()

Source code in src/MiraxSource.py
115
116
def get_name(self):
    return self.name

get_nchannels()

Source code in src/MiraxSource.py
140
141
def get_nchannels(self):
    return self.nchannels

get_pixel_size_um()

Source code in src/MiraxSource.py
124
125
def get_pixel_size_um(self):
    return self.pixel_size

get_position_um(well_id=None)

Source code in src/MiraxSource.py
127
128
129
def get_position_um(self, well_id=None):
    # Not applicable for Mirax
    return {'x': 0, 'y': 0}

get_rows()

Source code in src/MiraxSource.py
146
147
def get_rows(self):
    return []

get_scales()

Source code in src/MiraxSource.py
65
66
def get_scales(self):
    return self.scales

get_shape()

Source code in src/MiraxSource.py
59
60
def get_shape(self):
    return self.shape

get_shapes()

Source code in src/MiraxSource.py
62
63
def get_shapes(self):
    return self.shapes

get_time_points()

Source code in src/MiraxSource.py
155
156
def get_time_points(self):
    return []

get_total_data_size()

Source code in src/MiraxSource.py
164
165
def get_total_data_size(self):
    return np.prod(self.shape) * np.dtype(self.get_dtype()).itemsize

get_wells()

Source code in src/MiraxSource.py
152
153
def get_wells(self):
    return []

init_metadata()

Source code in src/MiraxSource.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def init_metadata(self):
    self.metadata = {key.lower(): value for key, value in dict(self.slide.properties).items()}

    self.dimensions = self.slide.level_dimensions
    self.widths = [width for width, height in self.slide.level_dimensions]
    self.heights = [height for width, height in self.slide.level_dimensions]
    self.level_downsamples = self.slide.level_downsamples
    self.scales = [1 / downsample for downsample in self.level_downsamples]
    self.nchannels = 3      # Mirax is RGBA; convert to RGB
    self.shapes = [(height, width, self.nchannels) for (width, height) in self.dimensions]
    self.shape = self.shapes[0]
    self.dim_order = 'yxc'
    self.is_rgb_channels = True
    nbits = 8
    for key, value in self.metadata.items():
        if 'slide_name' in key:
            self.name = value
        if 'slide_bitdepth' in key:
            nbits = int(value)
    self.dtype = np.dtype(f'uint{nbits}')

    # OpenSlide stores microns per pixel in properties
    mpp_x = float(self.metadata.get(openslide.PROPERTY_NAME_MPP_X, 1))
    mpp_y = float(self.metadata.get(openslide.PROPERTY_NAME_MPP_Y, 1))
    self.pixel_size = {'x': mpp_x, 'y': mpp_y}
    background_float = hexrgb_to_rgba(self.metadata.get(openslide.PROPERTY_NAME_BACKGROUND_COLOR, '000000'))[:3]
    self.background = [np.uint8(value * 255) for value in background_float]

    self.name = get_filetitle(self.uri)
    return self.metadata

is_rgb()

Source code in src/MiraxSource.py
143
144
def is_rgb(self):
    return True

is_screen()

Source code in src/MiraxSource.py
55
56
57
def is_screen(self):
    # Mirax files are not multi-well screens
    return False

read_array(x, y, width, height, level=0)

Source code in src/MiraxSource.py
71
72
73
74
75
76
77
78
79
def read_array(self, x, y, width, height, level=0):
    # OpenSlide uses (x, y) coordinates in level 0 reference size
    x0 = int(x * self.level_downsamples[level])
    y0 = int(y * self.level_downsamples[level])
    #return np.array(self.slide.read_region((x0, y0), level, (width, height)).convert('RGB'))   # discard alpha
    rgba = np.array(self.slide.read_region((x0, y0), level, (width, height)))
    alpha = np.atleast_3d(rgba[..., 3] / np.float32(255))
    rgb = (rgba[..., :3] * alpha + self.background * (1 - alpha)).astype(np.uint8)
    return rgb

OmeTiffWriter

PYRAMID_DOWNSCALE = 2 module-attribute

PYRAMID_LEVELS = 6 module-attribute

RETRY_ATTEMPTS = 3 module-attribute

TIFF_COMPRESSION = 'LZW' module-attribute

TILE_SIZE = 1024 module-attribute

VERSION = 'v0.1.10' module-attribute

ZARR_CHUNK_SIZE = TILE_SIZE module-attribute

ZARR_SHARD_MULTIPLIER = 10 module-attribute

OmeTiffWriter

Bases: OmeWriter

Writes image data and metadata to OME-TIFF files.

Source code in src/OmeTiffWriter.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class OmeTiffWriter(OmeWriter):
    """
    Writes image data and metadata to OME-TIFF files.
    """
    def __init__(self, verbose=False):
        """
        Initialize OmeTiffWriter.

        Args:
            verbose (bool): If True, prints progress info.
        """
        super().__init__()
        self.verbose = verbose

    def write(self, filepath, source, **kwargs):
        """
        Writes image or screen data to OME-TIFF files.

        Args:
            filepath (str): Output file path.
            source (ImageSource): Source object.
            **kwargs: Additional options (e.g. wells selection).

        Returns:
            dict: Containing output_path: str or list Output file path(s) and data window.
        """

        dim_order = ''
        source_dim_order = source.get_dim_order()
        if source.get_time_points():
            dim_order += 't'
        if 'c' in source_dim_order and not source.is_rgb():
            dim_order += 'c'
        if 'z' in source_dim_order:
            dim_order += 'z'
        dim_order += 'yx'
        if 'c' in source_dim_order and source.is_rgb():
            dim_order += 'c'
        self.dim_order = dim_order

        if source.is_screen():
            filepath, total_size, window = self._write_screen(filepath, source, **kwargs)
        else:
            filepath, total_size, window = self._write_image(filepath, source, **kwargs)

        if self.verbose:
            print(f'Total data written: {print_hbytes(total_size)}')

        return {'output_path': filepath, 'total_size':total_size, 'window': window}

    def _write_screen(self, filename, source, **kwargs):
        """
        Writes multi-well screen data to separate TIFF files and companion metadata.

        Args:
            filename (str): Output file name.
            source (ImageSource): Source object.
            **kwargs: Additional options (e.g. wells selection).

        Returns:
            tuple: (List of output paths, total data size, image window)
        """
        # writes separate tiff files for each field, and separate metadata companion file
        window = []
        output_paths = []
        filepath, filename = os.path.split(filename)
        filetitle = os.path.splitext(filename)[0].rstrip('.ome')

        companion_filename = os.path.join(filepath, filetitle + '.companion.ome')
        companion_uuid = create_uuid()

        wells = kwargs.get('wells', source.get_wells())
        fields = list(map(str, source.get_fields()))

        total_size = 0
        image_uuids = []
        image_filenames = []
        for well_id in wells:
            for field_id in fields:
                resolution, resolution_unit = create_resolution_metadata(source)
                data = source.get_data(self.dim_order, well_id=well_id, field_id=field_id)

                filename = f'{filetitle}'
                filename += f'_{pad_leading_zero(well_id)}'
                if field_id is not None:
                    filename += f'_{pad_leading_zero(field_id)}'
                filename = os.path.join(filepath, filename + '.ome.tiff')
                xml_metadata, image_uuid = create_binaryonly_metadata(os.path.basename(companion_filename), companion_uuid)

                size, window = self._write_tiff(filename, source, data,
                                                resolution=resolution, resolution_unit=resolution_unit,
                                                tile_size=TILE_SIZE, compression=TIFF_COMPRESSION,
                                                xml_metadata=xml_metadata,
                                                pyramid_levels=PYRAMID_LEVELS, pyramid_downscale=PYRAMID_DOWNSCALE,
                                                well_id=well_id, field_id=field_id, **kwargs)

                image_uuids.append(image_uuid)
                image_filenames.append(os.path.basename(filename))
                output_paths.append(filename)
                total_size += size

        xml_metadata = create_metadata(source,
                                       uuid=companion_uuid, image_uuids=image_uuids, image_filenames=image_filenames,
                                       wells=wells)
        with open(companion_filename, 'wb') as file:
            file.write(xml_metadata.encode())

        output_paths = [companion_filename] + output_paths

        return output_paths, total_size, window

    def _write_image(self, filename, source, **kwargs):
        """
        Writes single image data to a TIFF file.

        Args:
            filename (str): Output file name.
            source (ImageSource): Source object.
            **kwargs: Additional options.

        Returns:
            tuple: (Output path, data size)
        """
        xml_metadata = create_metadata(source, image_filenames=[filename])
        resolution, resolution_unit = create_resolution_metadata(source)
        data_generator = source.get_data_as_generator(self.dim_order)

        size, window = self._write_tiff(filename, source, data_generator,
                                        resolution=resolution, resolution_unit=resolution_unit,
                                        tile_size=TILE_SIZE, compression=TIFF_COMPRESSION,
                                        xml_metadata=xml_metadata,
                                        pyramid_levels=PYRAMID_LEVELS, pyramid_downscale=PYRAMID_DOWNSCALE,
                                        **kwargs)

        return filename, size, window

    def _write_tiff(self, filename, source, data,
                    resolution=None, resolution_unit=None, tile_size=None, compression=None, compressionargs=None,
                    xml_metadata=None, pyramid_levels=0, pyramid_downscale=2, well_id=None, field_id=None, **kwargs):
        """
        Writes image data to a TIFF file with optional pyramids and metadata.

        Args:
            filename (str): Output file name.
            source (ImageSource): Source object.
            data (ndarray or generator): Image data.
            resolution (tuple, optional): Pixel resolution.
            resolution_unit (str, optional): Resolution unit.
            tile_size (int or tuple, optional): Tile size.
            compression (str, optional): Compression type.
            xml_metadata (str, optional): OME-XML metadata.
            pyramid_levels (int): Number of pyramid levels.
            pyramid_downscale (int): Pyramid downscale factor.

        Returns:
            int: Data size in bytes.
        """
        is_generator = inspect.isgeneratorfunction(data)
        if is_generator:
            data_generator = data
            shape = list(source.shape)
            dtype = source.get_dtype()
        else:
            shape = list(data.shape)
            dtype = data.dtype

        source_dim_order = source.get_dim_order()
        x_index = source_dim_order.index('x')
        y_index = source_dim_order.index('y')
        if tile_size is not None:
            if isinstance(tile_size, int):
                tile_size = [tile_size] * 2
            if tile_size[0] > shape[y_index] or tile_size[1] > shape[x_index]:
                tile_size = None

        if xml_metadata is not None:
            # set ome=False to provide custom OME xml in description
            xml_metadata_bytes = xml_metadata.encode()
            is_ome = False
        else:
            xml_metadata_bytes = None
            is_ome = True

        # maximum size (w/o compression)
        if is_generator:
            data_size = np.prod(shape) * dtype.itemsize
        else:
            data_size = data.size * data.itemsize
        max_size = 0
        scale = 1
        for level in range(1 + pyramid_levels):
            max_size += data_size * scale ** 2
            scale /= pyramid_downscale
        is_bigtiff = (max_size > 2 ** 32)

        window_scanner = WindowScanner()
        with TiffWriter(filename, bigtiff=is_bigtiff, ome=is_ome) as writer:
            for level in range(pyramid_levels + 1):
                if level == 0:
                    scale = 1
                    subifds = pyramid_levels
                    subfiletype = None
                    new_shape = shape
                else:
                    scale /= pyramid_downscale
                    new_shape = list(shape)
                    new_shape[x_index] = int(shape[x_index] * scale)
                    new_shape[y_index] = int(shape[y_index] * scale)
                    if not is_generator:
                        data = resize(data, new_shape, preserve_range=True).astype(dtype)
                    subifds = None
                    subfiletype = 1
                    xml_metadata_bytes = None
                if is_generator:
                    data = data_generator(scale)
                writer.write(data, shape=tuple(new_shape), dtype=dtype, metadata={'axes': self.dim_order},
                             subifds=subifds, subfiletype=subfiletype,
                             resolution=resolution, resolutionunit=resolution_unit, tile=tile_size,
                             compression=compression, compressionargs=compressionargs,
                             description=xml_metadata_bytes)
                if level == pyramid_levels:
                    window = source.get_image_window(window_scanner, well_id=well_id, field_id=field_id, data=data)
        return data_size, window

verbose = verbose instance-attribute

__init__(verbose=False)

Initialize OmeTiffWriter.

Parameters:

Name Type Description Default
verbose bool

If True, prints progress info.

False
Source code in src/OmeTiffWriter.py
18
19
20
21
22
23
24
25
26
def __init__(self, verbose=False):
    """
    Initialize OmeTiffWriter.

    Args:
        verbose (bool): If True, prints progress info.
    """
    super().__init__()
    self.verbose = verbose

write(filepath, source, **kwargs)

Writes image or screen data to OME-TIFF files.

Parameters:

Name Type Description Default
filepath str

Output file path.

required
source ImageSource

Source object.

required
**kwargs

Additional options (e.g. wells selection).

{}

Returns:

Name Type Description
dict

Containing output_path: str or list Output file path(s) and data window.

Source code in src/OmeTiffWriter.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def write(self, filepath, source, **kwargs):
    """
    Writes image or screen data to OME-TIFF files.

    Args:
        filepath (str): Output file path.
        source (ImageSource): Source object.
        **kwargs: Additional options (e.g. wells selection).

    Returns:
        dict: Containing output_path: str or list Output file path(s) and data window.
    """

    dim_order = ''
    source_dim_order = source.get_dim_order()
    if source.get_time_points():
        dim_order += 't'
    if 'c' in source_dim_order and not source.is_rgb():
        dim_order += 'c'
    if 'z' in source_dim_order:
        dim_order += 'z'
    dim_order += 'yx'
    if 'c' in source_dim_order and source.is_rgb():
        dim_order += 'c'
    self.dim_order = dim_order

    if source.is_screen():
        filepath, total_size, window = self._write_screen(filepath, source, **kwargs)
    else:
        filepath, total_size, window = self._write_image(filepath, source, **kwargs)

    if self.verbose:
        print(f'Total data written: {print_hbytes(total_size)}')

    return {'output_path': filepath, 'total_size':total_size, 'window': window}

convert_dotnet_ticks_to_datetime(net_ticks)

Source code in src/util.py
137
138
def convert_dotnet_ticks_to_datetime(net_ticks):
    return datetime(1, 1, 1) + timedelta(microseconds=net_ticks // 10)

convert_to_um(value, unit)

Source code in src/util.py
184
185
186
187
188
189
190
191
192
def convert_to_um(value, unit):
    conversions = {
        'nm': 1e-3,
        'µm': 1, 'um': 1, 'micrometer': 1, 'micron': 1,
        'mm': 1e3, 'millimeter': 1e3,
        'cm': 1e4, 'centimeter': 1e4,
        'm': 1e6, 'meter': 1e6
    }
    return value * conversions.get(unit, 1)

ensure_list(item)

Source code in src/util.py
 7
 8
 9
10
def ensure_list(item):
    if not isinstance(item, (list, tuple)):
        item = [item]
    return item

get_filetitle(filename)

Source code in src/util.py
68
69
def get_filetitle(filename):
    return os.path.basename(os.path.splitext(filename)[0])

get_level_from_scale(source_scales, target_scale=1)

Source code in src/util.py
54
55
56
57
58
59
60
61
def get_level_from_scale(source_scales, target_scale=1):
    best_level_scale = 0, target_scale
    for level, scale in enumerate(source_scales):
        if np.isclose(scale, target_scale, rtol=1e-4):
            return level, 1
        if scale <= target_scale:
            best_level_scale = level, target_scale / scale
    return best_level_scale

get_numpy_data(data, dim_order, t, c, z, y, x, y_size, x_size)

Source code in src/util.py
39
40
41
42
43
44
45
46
47
48
49
50
51
def get_numpy_data(data, dim_order, t, c, z, y, x, y_size, x_size):
    x_index = dim_order.index('x')
    y_index = dim_order.index('y')
    slices = [slice(None)] * len(dim_order)
    if 't' in dim_order:
        slices[dim_order.index('t')] = t
    if 'c' in dim_order:
        slices[dim_order.index('c')] = c
    if 'z' in dim_order:
        slices[dim_order.index('z')] = z
    slices[y_index] = slice(y, y + y_size)
    slices[x_index] = slice(x, x + x_size)
    return data[tuple(slices)]

get_rows_cols_plate(nwells)

Source code in src/util.py
122
123
124
125
126
127
128
129
130
131
132
133
134
def get_rows_cols_plate(nwells):
    nrows_cols = {
        6: (2, 3),
        12: (3, 4),
        24: (4, 6),
        48: (6, 8),
        96: (8, 12),
        384: (16, 24)
    }
    nrows, ncols = nrows_cols[nwells]
    rows = [chr(ord('A') + i) for i in range(nrows)]
    cols = [str(i + 1) for i in range(ncols)]
    return rows, cols

pad_leading_zero(input_string, num_digits=2)

Source code in src/util.py
104
105
106
107
108
109
110
111
112
113
114
def pad_leading_zero(input_string, num_digits=2):
    output = str(input_string)
    is_well = not output[0].isdigit()
    if is_well:
        row, col = split_well_name(output, remove_leading_zeros=True)
        output = str(col)
    while len(output) < num_digits:
        output = '0' + output
    if is_well:
        output = row + output
    return output

print_dict(value, tab=0, max_len=250, bullet=False)

Source code in src/util.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def print_dict(value, tab=0, max_len=250, bullet=False):
    s = ''
    if isinstance(value, dict):
        for key, subvalue in value.items():
            s += '\n'
            if bullet:
                s += '-'
                bullet = False
            s += '\t' * tab + str(key) + ': '
            if isinstance(subvalue, dict):
                s += print_dict(subvalue, tab+1)
            elif isinstance(subvalue, list):
                for v in subvalue:
                    s += print_dict(v, tab+1, bullet=True)
            else:
                subvalue = str(subvalue)
                if len(subvalue) > max_len:
                    subvalue = subvalue[:max_len] + '...'
                s += subvalue
    else:
        s += str(value) + ' '
    return s

print_hbytes(nbytes)

Source code in src/util.py
219
220
221
222
223
224
225
226
227
228
229
230
def print_hbytes(nbytes):
    exps = ['', 'K', 'M', 'G', 'T', 'P', 'E']
    div = 1024
    exp = 0
    while nbytes > div:
        nbytes /= div
        exp += 1
    if exp < len(exps):
        e = exps[exp]
    else:
        e = f'e{exp * 3}'
    return f'{nbytes:.1f}{e}B'

redimension_data(data, old_order, new_order, **indices)

Source code in src/util.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def redimension_data(data, old_order, new_order, **indices):
    # able to provide optional dimension values e.g. t=0, z=0
    if new_order == old_order:
        return data

    new_data = data
    order = old_order
    # remove
    for o in old_order:
        if o not in new_order:
            index = order.index(o)
            dim_value = indices.get(o, 0)
            new_data = np.take(new_data, indices=dim_value, axis=index)
            order = order[:index] + order[index + 1:]
    # add
    for o in new_order:
        if o not in order:
            new_data = np.expand_dims(new_data, 0)
            order = o + order
    # move
    old_indices = [order.index(o) for o in new_order]
    new_indices = list(range(len(new_order)))
    new_data = np.moveaxis(new_data, old_indices, new_indices)
    return new_data

split_well_name(well_name, remove_leading_zeros=True, col_as_int=False)

Source code in src/util.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def split_well_name(well_name, remove_leading_zeros=True, col_as_int=False):
    matches = re.findall(r'(\D+)(\d+)', well_name)
    if len(matches) > 0:
        row, col = matches[0]
        if col_as_int or remove_leading_zeros:
            try:
                col = int(col)
            except ValueError:
                pass
        if not col_as_int:
            col = str(col)
        return row, col
    else:
        raise ValueError(f"Invalid well name format: {well_name}. Expected format like 'A1', 'B2', etc.")

splitall(path)

Source code in src/util.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def splitall(path):
    allparts = []
    while True:
        parts = os.path.split(path)
        if parts[0] == path:  # sentinel for absolute paths
            allparts.insert(0, parts[0])
            break
        elif parts[1] == path: # sentinel for relative paths
            allparts.insert(0, parts[1])
            break
        else:
            path = parts[0]
            allparts.insert(0, parts[1])
    return allparts

strip_leading_zeros(well_name)

Source code in src/util.py
117
118
119
def strip_leading_zeros(well_name):
    row, col = split_well_name(well_name, remove_leading_zeros=True)
    return f'{row}{col}'

validate_filename(filename)

Source code in src/util.py
64
65
def validate_filename(filename):
    return re.sub(r'[^\w_.)(-]', '_', filename)

xml_content_to_dict(element)

Source code in src/util.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def xml_content_to_dict(element):
    key = element.tag
    children = list(element)
    if key == 'Array':
        res = [xml_content_to_dict(child) for child in children]
        return res
    if len(children) > 0:
        if children[0].tag == 'Array':
            value = []
        else:
            value = {}
        for child in children:
            child_value = xml_content_to_dict(child)
            if isinstance(child_value, list):
                value.extend(child_value)
            else:
                value |= child_value
    else:
        value = element.text
        if value is not None:
            if '"' in value:
                value = value.replace('"', '')
            else:
                for t in (float, int, bool):
                    try:
                        if t == bool:
                            if value.lower() == 'true':
                                value = True
                            if value.lower() == 'false':
                                value = False
                        else:
                            value = t(value)
                        break
                    except (TypeError, ValueError):
                        pass

    if key == 'DataObject':
        key = element.attrib['ObjectType']
    if key == 'Attribute':
        key = element.attrib['Name']
    return {key: value}

OmeWriter

OmeWriter

Bases: ABC

Abstract base class for OME writers.

Source code in src/OmeWriter.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class OmeWriter(ABC):
    """
    Abstract base class for OME writers.
    """

    def write(self, filepath, source, verbose=False, **kwargs) -> str:
        """
        Write image data and metadata to output.

        Args:
            filepath (str): Output file path.
            source (ImageSource): Source object.
            verbose (bool): If True, prints progress info.
            **kwargs: Additional options.

        Returns:
            dict: Containing output_path: str or list Output file path(s), and other optional output.
        """
        # Expect to return output path (or filepath)
        raise NotImplementedError("This method should be implemented by subclasses.")

write(filepath, source, verbose=False, **kwargs)

Write image data and metadata to output.

Parameters:

Name Type Description Default
filepath str

Output file path.

required
source ImageSource

Source object.

required
verbose bool

If True, prints progress info.

False
**kwargs

Additional options.

{}

Returns:

Name Type Description
dict str

Containing output_path: str or list Output file path(s), and other optional output.

Source code in src/OmeWriter.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def write(self, filepath, source, verbose=False, **kwargs) -> str:
    """
    Write image data and metadata to output.

    Args:
        filepath (str): Output file path.
        source (ImageSource): Source object.
        verbose (bool): If True, prints progress info.
        **kwargs: Additional options.

    Returns:
        dict: Containing output_path: str or list Output file path(s), and other optional output.
    """
    # Expect to return output path (or filepath)
    raise NotImplementedError("This method should be implemented by subclasses.")

OmeZarrSource

OmeZarrSource

Bases: ImageSource

Source code in src/OmeZarrSource.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class OmeZarrSource(ImageSource):

    def _get_reader(self, add_path=None):
        uri = self.uri
        if add_path:
            uri = os.path.join(uri, add_path)
        location = parse_url(uri)
        if location is None:
            raise FileNotFoundError(f'Error parsing ome-zarr file {uri}')
        reader = Reader(location)
        nodes = list(reader())
        return reader, nodes

    def _get_metadata(self, add_path=None):
        metadata = {}
        _, nodes = self._get_reader(add_path)
        if len(nodes) > 0:
            metadata = nodes[0].metadata
        return metadata

    def init_metadata(self):
        reader, nodes = self._get_reader()
        if 'bioformats2raw.layout' in reader.zarr.root_attrs:
            # TODO: use paths provided in metadata
            reader, nodes = self._get_reader('0')
        # nodes may include images, labels etc
        if len(nodes) == 0:
            raise FileNotFoundError(f'No image data found in ome-zarr file {self.uri}')
        # first node will be the image pixel data
        image_node = nodes[0]
        self.metadata = image_node.metadata
        # channel metadata from ome-zarr-py limited; get from root_attrs manually
        #self.root_metadata = reader.zarr.root_attrs

        axes = self.metadata.get('axes', [])
        self.dim_order = ''.join([axis.get('name') for axis in axes])
        units = {axis['name']: axis['unit'] for axis in axes if 'unit' in axis}
        self.plate = self.metadata.get('metadata', {}).get('plate')
        self.is_plate = self.plate is not None

        scales = [transform['scale'] for transform_set in self.metadata['coordinateTransformations']
                  for transform in transform_set if transform['type'] == 'scale']
        self.pixel_size = {dim: convert_to_um(pixel_size, units.get(dim, '')) for dim, pixel_size
                           in zip(self.dim_order, scales[0]) if dim in 'xyz'}
        x_index, y_index = self.dim_order.index('x'), self.dim_order.index('y')
        scale0 = np.mean([scales[0][x_index] + scales[0][y_index]])
        self.scales = [float(scale0 / np.mean([scale[x_index] + scale[y_index]])) for scale in scales]
        if self.is_plate:
            self.name = self.plate.get('name', '')
            self.rows = [row['name'] for row in self.plate.get('rows', [])]
            self.columns = [column['name'] for column in self.plate.get('columns', [])]
            self.wells = {well['path'].replace('/', ''): well['path'] for well in self.plate.get('wells')}
            self.fields = list(range(self.plate.get('field_count', 0)))
            self.paths = {well_id: {field: f'{well_path}/{field}' for field in self.fields} for well_id, well_path in self.wells.items()}
            self.acquisitions = self.plate.get('acquisitions', [])
            self.data = None    # data will be read per plate well
        else:
            self.name = self.metadata.get('name', '')
            self.data = image_node.data
        if not self.name:
            self.name = get_filetitle(self.uri)
        self.name = str(self.name).rstrip('.ome')

        self.shapes = [data.shape for data in image_node.data]
        self.shape = self.shapes[0]
        self.heights = [shape[y_index] for shape in self.shapes]
        self.widths = [shape[x_index] for shape in self.shapes]
        self.dtype = image_node.data[0].dtype

    def is_screen(self):
        return self.is_plate

    def get_shape(self):
        return self.shape

    def get_shapes(self):
        return self.shapes

    def get_scales(self):
        return self.scales

    def get_data(self, level=0, well_id=None, field_id=None, **kwargs):
        if well_id is None and field_id is None:
            return self.data[level]
        else:
            _, nodes = self._get_reader(self.paths[well_id][field_id])
            return nodes[0].data[level]

    def get_data_as_generator(self, dim_order, **kwargs):
        def data_generator(scale=1):
            level, rescale = get_level_from_scale(self.scales, scale)
            level_data = self.data[level]
            read_size = int(TILE_SIZE / rescale)
            nz = self.shape[self.dim_order.index('z')] if 'z' in self.dim_order else 1
            for t in range(len(self.get_time_points())):
                for c in range(self.get_nchannels()):
                    for z in range(nz):
                        for y in range(0, self.heights[level], read_size):
                            for x in range(0, self.widths[level], read_size):
                                data = get_numpy_data(level_data, dim_order, t, c, z, y, x, read_size, read_size)
                                if rescale != 1:
                                    data = sk_transform.resize(data,
                                                               (np.array(data.shape) * rescale).astype(int),
                                                               preserve_range=True).astype(data.dtype)
                                yield redimension_data(data, self.dim_order, dim_order)
        return data_generator

    def get_image_window(self, window_scanner, well_id=None, field_id=None, data=None):
        if well_id is None and field_id is None:
            metadata = self.metadata
        else:
            metadata = self._get_metadata(self.paths[well_id][field_id])
        window = np.transpose(metadata.get('contrast_limits', ([], [])))
        return window

    def get_name(self):
        return self.name

    def get_dim_order(self):
        return self.dim_order

    def get_dtype(self):
        return self.dtype

    def get_pixel_size_um(self):
        return self.pixel_size

    def get_position_um(self, well_id=None):
        metadata = self._get_metadata(self.paths[well_id][0])
        for transforms in metadata['coordinateTransformations'][0]:
            if transforms['type'] == 'translation':
                return {dim:value for dim, value in zip(self.dim_order, transforms['translation'])}
        return {}

    def get_channels(self):
        channels = []
        colormaps = self.metadata['colormap']
        for channeli, channel_name in enumerate(self.metadata['channel_names']):
            channel = {'label': channel_name}
            if channeli < len(colormaps):
                channel['color'] = colormaps[channeli][-1]
            channels.append(channel)
        return channels

    def get_nchannels(self):
        return self.shape[self.dim_order.index('c')] if 'c' in self.dim_order else 1

    def is_rgb(self):
        return self.get_nchannels() in (3, 4)

    def get_rows(self):
        return self.rows

    def get_columns(self):
        return self.columns

    def get_wells(self):
        return self.wells

    def get_time_points(self):
        nt = self.shape[self.dim_order.index('t')] if 't' in self.dim_order else 1
        return list(range(nt))

    def get_fields(self):
        return self.fields

    def get_acquisitions(self):
        return self.acquisitions

    def get_total_data_size(self):
        total_size = np.prod(self.shape)
        if self.is_plate:
            total_size *= len(self.get_wells()) * len(self.get_fields())
        return total_size

get_acquisitions()

Source code in src/OmeZarrSource.py
178
179
def get_acquisitions(self):
    return self.acquisitions

get_channels()

Source code in src/OmeZarrSource.py
146
147
148
149
150
151
152
153
154
def get_channels(self):
    channels = []
    colormaps = self.metadata['colormap']
    for channeli, channel_name in enumerate(self.metadata['channel_names']):
        channel = {'label': channel_name}
        if channeli < len(colormaps):
            channel['color'] = colormaps[channeli][-1]
        channels.append(channel)
    return channels

get_columns()

Source code in src/OmeZarrSource.py
165
166
def get_columns(self):
    return self.columns

get_data(level=0, well_id=None, field_id=None, **kwargs)

Source code in src/OmeZarrSource.py
93
94
95
96
97
98
def get_data(self, level=0, well_id=None, field_id=None, **kwargs):
    if well_id is None and field_id is None:
        return self.data[level]
    else:
        _, nodes = self._get_reader(self.paths[well_id][field_id])
        return nodes[0].data[level]

get_data_as_generator(dim_order, **kwargs)

Source code in src/OmeZarrSource.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def get_data_as_generator(self, dim_order, **kwargs):
    def data_generator(scale=1):
        level, rescale = get_level_from_scale(self.scales, scale)
        level_data = self.data[level]
        read_size = int(TILE_SIZE / rescale)
        nz = self.shape[self.dim_order.index('z')] if 'z' in self.dim_order else 1
        for t in range(len(self.get_time_points())):
            for c in range(self.get_nchannels()):
                for z in range(nz):
                    for y in range(0, self.heights[level], read_size):
                        for x in range(0, self.widths[level], read_size):
                            data = get_numpy_data(level_data, dim_order, t, c, z, y, x, read_size, read_size)
                            if rescale != 1:
                                data = sk_transform.resize(data,
                                                           (np.array(data.shape) * rescale).astype(int),
                                                           preserve_range=True).astype(data.dtype)
                            yield redimension_data(data, self.dim_order, dim_order)
    return data_generator

get_dim_order()

Source code in src/OmeZarrSource.py
130
131
def get_dim_order(self):
    return self.dim_order

get_dtype()

Source code in src/OmeZarrSource.py
133
134
def get_dtype(self):
    return self.dtype

get_fields()

Source code in src/OmeZarrSource.py
175
176
def get_fields(self):
    return self.fields

get_image_window(window_scanner, well_id=None, field_id=None, data=None)

Source code in src/OmeZarrSource.py
119
120
121
122
123
124
125
def get_image_window(self, window_scanner, well_id=None, field_id=None, data=None):
    if well_id is None and field_id is None:
        metadata = self.metadata
    else:
        metadata = self._get_metadata(self.paths[well_id][field_id])
    window = np.transpose(metadata.get('contrast_limits', ([], [])))
    return window

get_name()

Source code in src/OmeZarrSource.py
127
128
def get_name(self):
    return self.name

get_nchannels()

Source code in src/OmeZarrSource.py
156
157
def get_nchannels(self):
    return self.shape[self.dim_order.index('c')] if 'c' in self.dim_order else 1

get_pixel_size_um()

Source code in src/OmeZarrSource.py
136
137
def get_pixel_size_um(self):
    return self.pixel_size

get_position_um(well_id=None)

Source code in src/OmeZarrSource.py
139
140
141
142
143
144
def get_position_um(self, well_id=None):
    metadata = self._get_metadata(self.paths[well_id][0])
    for transforms in metadata['coordinateTransformations'][0]:
        if transforms['type'] == 'translation':
            return {dim:value for dim, value in zip(self.dim_order, transforms['translation'])}
    return {}

get_rows()

Source code in src/OmeZarrSource.py
162
163
def get_rows(self):
    return self.rows

get_scales()

Source code in src/OmeZarrSource.py
90
91
def get_scales(self):
    return self.scales

get_shape()

Source code in src/OmeZarrSource.py
84
85
def get_shape(self):
    return self.shape

get_shapes()

Source code in src/OmeZarrSource.py
87
88
def get_shapes(self):
    return self.shapes

get_time_points()

Source code in src/OmeZarrSource.py
171
172
173
def get_time_points(self):
    nt = self.shape[self.dim_order.index('t')] if 't' in self.dim_order else 1
    return list(range(nt))

get_total_data_size()

Source code in src/OmeZarrSource.py
181
182
183
184
185
def get_total_data_size(self):
    total_size = np.prod(self.shape)
    if self.is_plate:
        total_size *= len(self.get_wells()) * len(self.get_fields())
    return total_size

get_wells()

Source code in src/OmeZarrSource.py
168
169
def get_wells(self):
    return self.wells

init_metadata()

Source code in src/OmeZarrSource.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def init_metadata(self):
    reader, nodes = self._get_reader()
    if 'bioformats2raw.layout' in reader.zarr.root_attrs:
        # TODO: use paths provided in metadata
        reader, nodes = self._get_reader('0')
    # nodes may include images, labels etc
    if len(nodes) == 0:
        raise FileNotFoundError(f'No image data found in ome-zarr file {self.uri}')
    # first node will be the image pixel data
    image_node = nodes[0]
    self.metadata = image_node.metadata
    # channel metadata from ome-zarr-py limited; get from root_attrs manually
    #self.root_metadata = reader.zarr.root_attrs

    axes = self.metadata.get('axes', [])
    self.dim_order = ''.join([axis.get('name') for axis in axes])
    units = {axis['name']: axis['unit'] for axis in axes if 'unit' in axis}
    self.plate = self.metadata.get('metadata', {}).get('plate')
    self.is_plate = self.plate is not None

    scales = [transform['scale'] for transform_set in self.metadata['coordinateTransformations']
              for transform in transform_set if transform['type'] == 'scale']
    self.pixel_size = {dim: convert_to_um(pixel_size, units.get(dim, '')) for dim, pixel_size
                       in zip(self.dim_order, scales[0]) if dim in 'xyz'}
    x_index, y_index = self.dim_order.index('x'), self.dim_order.index('y')
    scale0 = np.mean([scales[0][x_index] + scales[0][y_index]])
    self.scales = [float(scale0 / np.mean([scale[x_index] + scale[y_index]])) for scale in scales]
    if self.is_plate:
        self.name = self.plate.get('name', '')
        self.rows = [row['name'] for row in self.plate.get('rows', [])]
        self.columns = [column['name'] for column in self.plate.get('columns', [])]
        self.wells = {well['path'].replace('/', ''): well['path'] for well in self.plate.get('wells')}
        self.fields = list(range(self.plate.get('field_count', 0)))
        self.paths = {well_id: {field: f'{well_path}/{field}' for field in self.fields} for well_id, well_path in self.wells.items()}
        self.acquisitions = self.plate.get('acquisitions', [])
        self.data = None    # data will be read per plate well
    else:
        self.name = self.metadata.get('name', '')
        self.data = image_node.data
    if not self.name:
        self.name = get_filetitle(self.uri)
    self.name = str(self.name).rstrip('.ome')

    self.shapes = [data.shape for data in image_node.data]
    self.shape = self.shapes[0]
    self.heights = [shape[y_index] for shape in self.shapes]
    self.widths = [shape[x_index] for shape in self.shapes]
    self.dtype = image_node.data[0].dtype

is_rgb()

Source code in src/OmeZarrSource.py
159
160
def is_rgb(self):
    return self.get_nchannels() in (3, 4)

is_screen()

Source code in src/OmeZarrSource.py
81
82
def is_screen(self):
    return self.is_plate

create_axes_metadata(dimension_order)

Create axes metadata for OME-Zarr from dimension order.

Parameters:

Name Type Description Default
dimension_order str

String of dimension characters.

required

Returns:

Name Type Description
list

List of axis metadata dictionaries.

Source code in src/ome_zarr_util.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def create_axes_metadata(dimension_order):
    """
    Create axes metadata for OME-Zarr from dimension order.

    Args:
        dimension_order (str): String of dimension characters.

    Returns:
        list: List of axis metadata dictionaries.
    """
    axes = []
    for dimension in dimension_order:
        unit1 = None
        if dimension == 't':
            type1 = 'time'
            unit1 = 'millisecond'
        elif dimension == 'c':
            type1 = 'channel'
        else:
            type1 = 'space'
            unit1 = 'micrometer'
        axis = {'name': dimension, 'type': type1}
        if unit1 is not None and unit1 != '':
            axis['unit'] = unit1
        axes.append(axis)
    return axes

create_channel_metadata(dtype, channels, nchannels, is_rgb, window, ome_version)

Create channel metadata for OME-Zarr.

Parameters:

Name Type Description Default
dtype

Numpy dtype of image data.

required
channels list

List of channel dicts.

required
nchannels int

Number of channels.

required
window tuple

Min/max window values.

required
ome_version str

OME-Zarr version.

required

Returns:

Name Type Description
dict

Channel metadata dictionary.

Source code in src/ome_zarr_util.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def create_channel_metadata(dtype, channels, nchannels, is_rgb, window, ome_version):
    """
    Create channel metadata for OME-Zarr.

    Args:
        dtype: Numpy dtype of image data.
        channels (list): List of channel dicts.
        nchannels (int): Number of channels.
        window (tuple): Min/max window values.
        ome_version (str): OME-Zarr version.

    Returns:
        dict: Channel metadata dictionary.
    """
    if len(channels) < nchannels:
        labels = []
        colors = []
        if is_rgb and nchannels in (3, 4):
            labels = ['Red', 'Green', 'Blue']
            colors = [(1, 0, 0), (0, 1, 0), (0, 0, 1)]
        if is_rgb and nchannels == 4:
            labels += ['Alpha']
            colors += [(1, 1, 1)]
        channels = [{'label': label, 'color': color} for label, color in zip(labels, colors)]

    omezarr_channels = []
    starts, ends = window
    for channeli, channel in enumerate(channels):
        omezarr_channel = {'label': channel.get('label', channel.get('Name', f'{channeli}')), 'active': True}
        color = channel.get('color', channel.get('Color'))
        if color is not None:
            omezarr_channel['color'] = rgba_to_hexrgb(color)
        if dtype.kind == 'f':
            min, max = 0, 1
        else:
            info = np.iinfo(dtype)
            min, max = info.min, info.max
        if starts and ends:
            start, end = starts[channeli], ends[channeli]
        else:
            start, end = min, max
        omezarr_channel['window'] = {'min': min, 'max': max, 'start': start, 'end': end}
        omezarr_channels.append(omezarr_channel)

    metadata = {
        'version': ome_version,
        'channels': omezarr_channels,
    }
    return metadata

create_transformation_metadata(dimension_order, pixel_size_um, scale, translation_um=None)

Create transformation metadata (scale and translation) for OME-Zarr.

Parameters:

Name Type Description Default
dimension_order str

String of dimension characters.

required
pixel_size_um dict

Pixel size in micrometers per dimension.

required
scale float

Scaling factor.

required
translation_um dict

Translation in micrometers per dimension.

None

Returns:

Name Type Description
list

List of transformation metadata dictionaries.

Source code in src/ome_zarr_util.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def create_transformation_metadata(dimension_order, pixel_size_um, scale, translation_um=None):
    """
    Create transformation metadata (scale and translation) for OME-Zarr.

    Args:
        dimension_order (str): String of dimension characters.
        pixel_size_um (dict): Pixel size in micrometers per dimension.
        scale (float): Scaling factor.
        translation_um (dict, optional): Translation in micrometers per dimension.

    Returns:
        list: List of transformation metadata dictionaries.
    """
    metadata = []
    pixel_size_scale = []
    translation_scale = []
    for dimension in dimension_order:
        pixel_size_scale1 = pixel_size_um.get(dimension, 1)
        if pixel_size_scale1 == 0:
            pixel_size_scale1 = 1
        if dimension in ['x', 'y']:
            pixel_size_scale1 /= scale
        pixel_size_scale.append(pixel_size_scale1)

        if translation_um is not None:
            translation1 = translation_um.get(dimension, 0)
            if dimension in ['x', 'y']:
                translation1 *= scale
            translation_scale.append(translation1)

    metadata.append({'type': 'scale', 'scale': pixel_size_scale})
    if translation_um is not None:
        metadata.append({'type': 'translation', 'translation': translation_scale})
    return metadata

rgba_to_hexrgb(rgba)

Source code in src/color_conversion.py
16
17
18
def rgba_to_hexrgb(rgba: list) -> str:
    hexrgb = ''.join([hex(int(x * 255))[2:].upper().zfill(2) for x in rgba[:3]])
    return hexrgb

scale_dimensions_dict(shape0, scale)

Scale x and y dimensions in a shape dictionary.

Parameters:

Name Type Description Default
shape0 dict

Original shape dictionary.

required
scale float

Scaling factor.

required

Returns:

Name Type Description
dict

Scaled shape dictionary.

Source code in src/ome_zarr_util.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def scale_dimensions_dict(shape0, scale):
    """
    Scale x and y dimensions in a shape dictionary.

    Args:
        shape0 (dict): Original shape dictionary.
        scale (float): Scaling factor.

    Returns:
        dict: Scaled shape dictionary.
    """
    shape = {}
    if scale == 1:
        return shape0
    for dimension, shape1 in shape0.items():
        if dimension[0] in ['x', 'y']:
            shape1 = int(shape1 * scale)
        shape[dimension] = shape1
    return shape

scale_dimensions_xy(shape0, dimension_order, scale)

Scale x and y dimensions in a shape tuple.

Parameters:

Name Type Description Default
shape0 tuple

Original shape.

required
dimension_order str

String of dimension characters.

required
scale float

Scaling factor.

required

Returns:

Name Type Description
list

Scaled shape.

Source code in src/ome_zarr_util.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def scale_dimensions_xy(shape0, dimension_order, scale):
    """
    Scale x and y dimensions in a shape tuple.

    Args:
        shape0 (tuple): Original shape.
        dimension_order (str): String of dimension characters.
        scale (float): Scaling factor.

    Returns:
        list: Scaled shape.
    """
    shape = []
    if scale == 1:
        return shape0
    for shape1, dimension in zip(shape0, dimension_order):
        if dimension[0] in ['x', 'y']:
            shape1 = int(shape1 * scale)
        shape.append(shape1)
    return shape

OmeZarrWriter

PYRAMID_DOWNSCALE = 2 module-attribute

PYRAMID_LEVELS = 6 module-attribute

RETRY_ATTEMPTS = 3 module-attribute

TIFF_COMPRESSION = 'LZW' module-attribute

TILE_SIZE = 1024 module-attribute

VERSION = 'v0.1.10' module-attribute

ZARR_CHUNK_SIZE = TILE_SIZE module-attribute

ZARR_SHARD_MULTIPLIER = 10 module-attribute

OmeZarrWriter

Bases: OmeWriter

Writer for exporting image or screen data to OME-Zarr format. Supports both single images and high-content screening (HCS) plates.

Source code in src/OmeZarrWriter.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class OmeZarrWriter(OmeWriter):
    """
    Writer for exporting image or screen data to OME-Zarr format.
    Supports both single images and high-content screening (HCS) plates.
    """

    def __init__(self, zarr_version=2, ome_version='0.4', verbose=False):
        """
        Initialize the OmeZarrWriter.

        Args:
            zarr_version (int): Zarr format version (2 or 3).
            ome_version (str): OME-Zarr metadata version ('0.4' or '0.5').
            verbose (bool): If True, print additional information.
        """
        super().__init__()
        self.zarr_version = zarr_version
        self.ome_version = ome_version
        if ome_version == '0.4':
            from ome_zarr.format import FormatV04
            self.ome_format = FormatV04()
        elif ome_version == '0.5':
            from ome_zarr.format import FormatV05
            self.ome_format = FormatV05()
        else:
            self.ome_format = None
        self.verbose = verbose
        self.dim_order = 'tczyx'

    def write(self, filepath, source, **kwargs):
        """
        Write the provided source data to an OME-Zarr file.

        Args:
            filepath (str): Output path for the Zarr file.
            source: source reader supporting required interface.
            **kwargs: Additional arguments (e.g. wells selection).

        Returns:
            dict: Containing output_path: str Output file path.
        """
        if source.is_screen():
            zarr_root, total_size = self._write_screen(filepath, source, **kwargs)
        else:
            zarr_root, total_size = self._write_image(filepath, source, **kwargs)

        zarr_root.attrs['_creator'] = {'name': 'nl.biomero.OmeZarrWriter', 'version': VERSION}

        if self.verbose:
            print(f'Total data written: {print_hbytes(total_size)}')

        return {'output_path': filepath, 'total_size':total_size}

    def _write_screen(self, filepath, source, **kwargs):
        """
        Write a high-content screening (HCS) plate to OME-Zarr.

        Args:
            filepath (str): Output path for the Zarr file.
            source: source reader supporting required interface.
            **kwargs: Additional arguments (e.g., wells).

        Returns:
            tuple: (zarr_root, total_size) where zarr_root is the root group and total_size is bytes written.
        """
        #zarr_location = parse_url(filename, mode='w', fmt=self.ome_format)
        zarr_location = filepath
        zarr_root = zarr.open_group(zarr_location, mode='w', zarr_version=self.zarr_version)

        row_names = [chr(ord('A') + index) for index
                     in range(max([ord(row_name.upper()) - ord('A') for row_name in source.get_rows()]) + 1)]
        col_names = [str(index) for index
                     in range(1, max([int(col) for col in source.get_columns()]) + 1)]
        wells = kwargs.get('wells', source.get_wells())
        well_paths = ['/'.join(split_well_name(well)) for well in wells]
        fields = list(map(str, source.get_fields()))

        acquisitions = source.get_acquisitions()
        name = source.get_name()
        write_plate_metadata(zarr_root, row_names, col_names, well_paths,
                             name=name, field_count=len(fields), acquisitions=acquisitions,
                             fmt=self.ome_format)
        total_size = 0
        for well_id in wells:
            row, col = split_well_name(well_id)
            row_group = zarr_root.require_group(str(row))
            well_group = row_group.require_group(str(col))
            write_well_metadata(well_group, fields, fmt=self.ome_format)
            position = source.get_position_um(well_id)
            for field_id in fields:
                image_group = well_group.require_group(field_id)
                data = source.get_data(self.dim_order, well_id=well_id, field_id=field_id)
                window_scanner = WindowScanner()
                window = source.get_image_window(window_scanner, well_id=well_id, field_id=field_id, data=data)
                size = self._write_data(image_group, data, source, window, position=position)
                total_size += size

        return zarr_root, total_size

    def _write_image(self, filepath, source, **kwargs):
        """
        Write a single image to OME-Zarr.

        Args:
            filepath (str): Output path for the Zarr file.
            source: source reader for image data.
            **kwargs: Additional arguments.

        Returns:
            tuple: (zarr_root, size) where zarr_root is the root group and size is bytes written.
        """
        #zarr_location = parse_url(filename, mode='w', fmt=self.ome_format)
        zarr_location = filepath
        zarr_root = zarr.open_group(zarr_location, mode='w', zarr_version=self.zarr_version)

        pyramid_data = []
        scale = 1
        for _ in range(PYRAMID_LEVELS + 1):
            level, rescale = get_level_from_scale(source.get_scales(), scale)
            data = source.get_data_as_dask(self.dim_order, level=level)
            if rescale != 1:
                shape = list(data.shape)
                shape[-2:] = np.multiply(shape[-2:], rescale).astype(int)
                data = dask_utils.resize(data, shape)
            pyramid_data.append(data)
            scale /= PYRAMID_DOWNSCALE

        window_scanner = WindowScanner()
        window = source.get_image_window(window_scanner)
        size = self._write_data(zarr_root, pyramid_data, source, window, position=source.get_position_um())
        return zarr_root, size

    def _write_data(self, group, data, source, window, position=None):
        """
        Write image data and metadata to a Zarr group.

        Args:
            group: Zarr group to write into.
            data: Image data array.
            source: source reader.
            window: Image window information.
            position: Optional position information.

        Returns:
            int: Number of bytes written.
        """
        dim_order = self.dim_order
        dtype = source.get_dtype()
        channels = source.get_channels()
        nchannels = source.get_nchannels()
        is_rgb = source.is_rgb()

        axes = create_axes_metadata(dim_order)
        pixel_size_scales, scaler = self._create_scale_metadata(source, dim_order, position)
        metadata = {'omero': create_channel_metadata(dtype, channels, nchannels, is_rgb, window, self.ome_version),
                    'metadata': {'method': scaler.method}}

        is_pyramid = isinstance(data, list)
        if is_pyramid:
            data0 = data[0]
        else:
            data0 = data
        storage_options = None
        if self.zarr_version >= 3:
            if not hasattr(data0, 'chunksize'):
                chunks = []
                shards = []
                for dim, n in zip(dim_order, data0.shape):
                    if dim in 'xy':
                        chunks += [ZARR_CHUNK_SIZE]
                        shards += [ZARR_CHUNK_SIZE * ZARR_SHARD_MULTIPLIER]
                    else:
                        chunks += [1]
                        shards += [1]
                storage_options = {'chunks': chunks, 'shards': shards}

        size = data0.size * data0.itemsize
        if is_pyramid:
            #images = [Image.fromarray(data1) for data1 in data]
            #ngff_zarr.from_ngff_zarr() # use this to see construction
            #axes1 = [Axis()]
            #datasets1 = [Dataset()]
            #coordinateTransformations1 = Transform()
            #metadata = Metadata(axes1, datasets1, coordinateTransformations1)
            #multiscales = Multiscales(images, metadata)
            #ngff_zarr.to_ngff_zarr(group, multiscales=multiscales)

            write_multiscale(pyramid=data, group=group, axes=axes, coordinate_transformations=pixel_size_scales,
                            fmt=self.ome_format, storage_options=storage_options,
                            name=source.get_name(), metadata=metadata)
        else:
            write_image(image=data, group=group, axes=axes, coordinate_transformations=pixel_size_scales,
                        scaler=scaler, fmt=self.ome_format, storage_options=storage_options,
                        name=source.get_name(), metadata=metadata)
        return size

    def _create_scale_metadata(self, source, dim_order, translation, scaler=None):
        """
        Create coordinate transformation metadata for multiscale images.

        Args:
            source: source reader.
            dim_order (str): Dimension order string.
            translation: Translation or position information.
            scaler: Optional Scaler object.

        Returns:
            tuple: (pixel_size_scales, scaler)
        """
        if scaler is None:
            scaler = Scaler(downscale=PYRAMID_DOWNSCALE, max_layer=PYRAMID_LEVELS)
        pixel_size_scales = []
        scale = 1
        for i in range(scaler.max_layer + 1):
            pixel_size_scales.append(
                create_transformation_metadata(dim_order, source.get_pixel_size_um(),
                                               scale, translation))
            scale /= scaler.downscale
        return pixel_size_scales, scaler

dim_order = 'tczyx' instance-attribute

ome_format = FormatV04() instance-attribute

ome_version = ome_version instance-attribute

verbose = verbose instance-attribute

zarr_version = zarr_version instance-attribute

__init__(zarr_version=2, ome_version='0.4', verbose=False)

Initialize the OmeZarrWriter.

Parameters:

Name Type Description Default
zarr_version int

Zarr format version (2 or 3).

2
ome_version str

OME-Zarr metadata version ('0.4' or '0.5').

'0.4'
verbose bool

If True, print additional information.

False
Source code in src/OmeZarrWriter.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(self, zarr_version=2, ome_version='0.4', verbose=False):
    """
    Initialize the OmeZarrWriter.

    Args:
        zarr_version (int): Zarr format version (2 or 3).
        ome_version (str): OME-Zarr metadata version ('0.4' or '0.5').
        verbose (bool): If True, print additional information.
    """
    super().__init__()
    self.zarr_version = zarr_version
    self.ome_version = ome_version
    if ome_version == '0.4':
        from ome_zarr.format import FormatV04
        self.ome_format = FormatV04()
    elif ome_version == '0.5':
        from ome_zarr.format import FormatV05
        self.ome_format = FormatV05()
    else:
        self.ome_format = None
    self.verbose = verbose
    self.dim_order = 'tczyx'

write(filepath, source, **kwargs)

Write the provided source data to an OME-Zarr file.

Parameters:

Name Type Description Default
filepath str

Output path for the Zarr file.

required
source

source reader supporting required interface.

required
**kwargs

Additional arguments (e.g. wells selection).

{}

Returns:

Name Type Description
dict

Containing output_path: str Output file path.

Source code in src/OmeZarrWriter.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def write(self, filepath, source, **kwargs):
    """
    Write the provided source data to an OME-Zarr file.

    Args:
        filepath (str): Output path for the Zarr file.
        source: source reader supporting required interface.
        **kwargs: Additional arguments (e.g. wells selection).

    Returns:
        dict: Containing output_path: str Output file path.
    """
    if source.is_screen():
        zarr_root, total_size = self._write_screen(filepath, source, **kwargs)
    else:
        zarr_root, total_size = self._write_image(filepath, source, **kwargs)

    zarr_root.attrs['_creator'] = {'name': 'nl.biomero.OmeZarrWriter', 'version': VERSION}

    if self.verbose:
        print(f'Total data written: {print_hbytes(total_size)}')

    return {'output_path': filepath, 'total_size':total_size}

create_axes_metadata(dimension_order)

Create axes metadata for OME-Zarr from dimension order.

Parameters:

Name Type Description Default
dimension_order str

String of dimension characters.

required

Returns:

Name Type Description
list

List of axis metadata dictionaries.

Source code in src/ome_zarr_util.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def create_axes_metadata(dimension_order):
    """
    Create axes metadata for OME-Zarr from dimension order.

    Args:
        dimension_order (str): String of dimension characters.

    Returns:
        list: List of axis metadata dictionaries.
    """
    axes = []
    for dimension in dimension_order:
        unit1 = None
        if dimension == 't':
            type1 = 'time'
            unit1 = 'millisecond'
        elif dimension == 'c':
            type1 = 'channel'
        else:
            type1 = 'space'
            unit1 = 'micrometer'
        axis = {'name': dimension, 'type': type1}
        if unit1 is not None and unit1 != '':
            axis['unit'] = unit1
        axes.append(axis)
    return axes

create_channel_metadata(dtype, channels, nchannels, is_rgb, window, ome_version)

Create channel metadata for OME-Zarr.

Parameters:

Name Type Description Default
dtype

Numpy dtype of image data.

required
channels list

List of channel dicts.

required
nchannels int

Number of channels.

required
window tuple

Min/max window values.

required
ome_version str

OME-Zarr version.

required

Returns:

Name Type Description
dict

Channel metadata dictionary.

Source code in src/ome_zarr_util.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def create_channel_metadata(dtype, channels, nchannels, is_rgb, window, ome_version):
    """
    Create channel metadata for OME-Zarr.

    Args:
        dtype: Numpy dtype of image data.
        channels (list): List of channel dicts.
        nchannels (int): Number of channels.
        window (tuple): Min/max window values.
        ome_version (str): OME-Zarr version.

    Returns:
        dict: Channel metadata dictionary.
    """
    if len(channels) < nchannels:
        labels = []
        colors = []
        if is_rgb and nchannels in (3, 4):
            labels = ['Red', 'Green', 'Blue']
            colors = [(1, 0, 0), (0, 1, 0), (0, 0, 1)]
        if is_rgb and nchannels == 4:
            labels += ['Alpha']
            colors += [(1, 1, 1)]
        channels = [{'label': label, 'color': color} for label, color in zip(labels, colors)]

    omezarr_channels = []
    starts, ends = window
    for channeli, channel in enumerate(channels):
        omezarr_channel = {'label': channel.get('label', channel.get('Name', f'{channeli}')), 'active': True}
        color = channel.get('color', channel.get('Color'))
        if color is not None:
            omezarr_channel['color'] = rgba_to_hexrgb(color)
        if dtype.kind == 'f':
            min, max = 0, 1
        else:
            info = np.iinfo(dtype)
            min, max = info.min, info.max
        if starts and ends:
            start, end = starts[channeli], ends[channeli]
        else:
            start, end = min, max
        omezarr_channel['window'] = {'min': min, 'max': max, 'start': start, 'end': end}
        omezarr_channels.append(omezarr_channel)

    metadata = {
        'version': ome_version,
        'channels': omezarr_channels,
    }
    return metadata

create_transformation_metadata(dimension_order, pixel_size_um, scale, translation_um=None)

Create transformation metadata (scale and translation) for OME-Zarr.

Parameters:

Name Type Description Default
dimension_order str

String of dimension characters.

required
pixel_size_um dict

Pixel size in micrometers per dimension.

required
scale float

Scaling factor.

required
translation_um dict

Translation in micrometers per dimension.

None

Returns:

Name Type Description
list

List of transformation metadata dictionaries.

Source code in src/ome_zarr_util.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def create_transformation_metadata(dimension_order, pixel_size_um, scale, translation_um=None):
    """
    Create transformation metadata (scale and translation) for OME-Zarr.

    Args:
        dimension_order (str): String of dimension characters.
        pixel_size_um (dict): Pixel size in micrometers per dimension.
        scale (float): Scaling factor.
        translation_um (dict, optional): Translation in micrometers per dimension.

    Returns:
        list: List of transformation metadata dictionaries.
    """
    metadata = []
    pixel_size_scale = []
    translation_scale = []
    for dimension in dimension_order:
        pixel_size_scale1 = pixel_size_um.get(dimension, 1)
        if pixel_size_scale1 == 0:
            pixel_size_scale1 = 1
        if dimension in ['x', 'y']:
            pixel_size_scale1 /= scale
        pixel_size_scale.append(pixel_size_scale1)

        if translation_um is not None:
            translation1 = translation_um.get(dimension, 0)
            if dimension in ['x', 'y']:
                translation1 *= scale
            translation_scale.append(translation1)

    metadata.append({'type': 'scale', 'scale': pixel_size_scale})
    if translation_um is not None:
        metadata.append({'type': 'translation', 'translation': translation_scale})
    return metadata

rgba_to_hexrgb(rgba)

Source code in src/color_conversion.py
16
17
18
def rgba_to_hexrgb(rgba: list) -> str:
    hexrgb = ''.join([hex(int(x * 255))[2:].upper().zfill(2) for x in rgba[:3]])
    return hexrgb

scale_dimensions_dict(shape0, scale)

Scale x and y dimensions in a shape dictionary.

Parameters:

Name Type Description Default
shape0 dict

Original shape dictionary.

required
scale float

Scaling factor.

required

Returns:

Name Type Description
dict

Scaled shape dictionary.

Source code in src/ome_zarr_util.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def scale_dimensions_dict(shape0, scale):
    """
    Scale x and y dimensions in a shape dictionary.

    Args:
        shape0 (dict): Original shape dictionary.
        scale (float): Scaling factor.

    Returns:
        dict: Scaled shape dictionary.
    """
    shape = {}
    if scale == 1:
        return shape0
    for dimension, shape1 in shape0.items():
        if dimension[0] in ['x', 'y']:
            shape1 = int(shape1 * scale)
        shape[dimension] = shape1
    return shape

scale_dimensions_xy(shape0, dimension_order, scale)

Scale x and y dimensions in a shape tuple.

Parameters:

Name Type Description Default
shape0 tuple

Original shape.

required
dimension_order str

String of dimension characters.

required
scale float

Scaling factor.

required

Returns:

Name Type Description
list

Scaled shape.

Source code in src/ome_zarr_util.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def scale_dimensions_xy(shape0, dimension_order, scale):
    """
    Scale x and y dimensions in a shape tuple.

    Args:
        shape0 (tuple): Original shape.
        dimension_order (str): String of dimension characters.
        scale (float): Scaling factor.

    Returns:
        list: Scaled shape.
    """
    shape = []
    if scale == 1:
        return shape0
    for shape1, dimension in zip(shape0, dimension_order):
        if dimension[0] in ['x', 'y']:
            shape1 = int(shape1 * scale)
        shape.append(shape1)
    return shape

TiffSource

TiffSource

Bases: ImageSource

Loads image and metadata from TIFF or OME-TIFF files.

Source code in src/TiffSource.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
class TiffSource(ImageSource):
    """
    Loads image and metadata from TIFF or OME-TIFF files.
    """
    def __init__(self, uri, metadata={}):
        """
        Initialize TiffSource.

        Args:
            uri (str): Path to the TIFF file.
            metadata (dict): Optional metadata dictionary.
        """
        super().__init__(uri, metadata)
        image_filename = None
        ext = os.path.splitext(uri)[1].lower()
        if 'tif' in ext:
            image_filename = uri
        elif 'ome' in ext:
            # read metadata
            with open(uri, 'rb') as file:
                self.metadata = metadata_to_dict(file.read().decode())
            # try to open a ome-tiff file
            self.image_filenames = {}
            for image in ensure_list(self.metadata.get('Image', {})):
                filename = image.get('Pixels', {}).get('TiffData', {}).get('UUID', {}).get('FileName')
                if filename:
                    filepath = os.path.join(os.path.dirname(uri), filename)
                    self.image_filenames[image['ID']] = filepath
                    if image_filename is None:
                        image_filename = filepath
        else:
            raise RuntimeError(f'Unsupported tiff extension: {ext}')

        self.tiff = TiffFile(image_filename)

    def init_metadata(self):
        """
        Initializes and loads metadata from the (OME) TIFF file.

        Returns:
            dict: Metadata dictionary.
        """
        self.is_ome = self.tiff.is_ome
        self.is_imagej = self.tiff.is_imagej
        pixel_size = {}
        position = {}
        channels = []

        if self.tiff.series:
            pages = self.tiff.series
            page = pages[0]
        else:
            pages = self.tiff.pages
            page = self.tiff.pages.first
        if hasattr(page, 'levels'):
            pages = page.levels
        self.shapes = [page.shape for page in pages]
        self.shape = page.shape
        self.dim_order = page.axes.lower().replace('s', 'c').replace('r', '')
        x_index, y_index = self.dim_order.index('x'), self.dim_order.index('y')
        self.scales = [float(np.mean([shape[x_index] / self.shape[x_index], shape[y_index] / self.shape[y_index]]))
                       for shape in self.shapes]
        self.is_photometric_rgb = (self.tiff.pages.first.photometric == PHOTOMETRIC.RGB)
        self.nchannels = self.shape[self.dim_order.index('c')] if 'c' in self.dim_order else 1

        if self.is_ome:
            metadata = metadata_to_dict(self.tiff.ome_metadata)
            if metadata and not 'BinaryOnly' in metadata:
                self.metadata = metadata
            image0 = ensure_list(self.metadata.get('Image', []))[0]
            self.is_plate = 'Plate' in self.metadata
            if self.is_plate:
                plate = self.metadata['Plate']
                self.name = plate.get('Name')
                rows = set()
                columns = set()
                wells = {}
                fields = []
                image_refs = {}
                for well in ensure_list(plate['Well']):
                    row = create_row_col_label(well['Row'], plate['RowNamingConvention'])
                    column = create_row_col_label(well['Column'], plate['ColumnNamingConvention'])
                    rows.add(row)
                    columns.add(column)
                    label = f'{row}{column}'
                    wells[label] = well['ID']
                    image_refs[label] = {}
                    for sample in ensure_list(well.get('WellSample')):
                        index = sample.get('Index', 0)
                        image_refs[label][str(index)] = sample['ImageRef']['ID']
                        if index not in fields:
                            fields.append(index)
                self.rows = sorted(rows)
                self.columns = list(columns)
                self.wells = list(wells.keys())
                self.fields = fields
                self.image_refs = image_refs
            else:
                self.name = image0.get('Name')
            if not self.name:
                self.name = get_filetitle(self.uri)
            self.name = str(self.name).rstrip('.tiff').rstrip('.tif').rstrip('.ome')
            pixels = image0.get('Pixels', {})
            self.dtype = np.dtype(pixels['Type'])
            if 'PhysicalSizeX' in pixels:
                pixel_size['x'] = convert_to_um(float(pixels.get('PhysicalSizeX')), pixels.get('PhysicalSizeXUnit'))
            if 'PhysicalSizeY' in pixels:
                pixel_size['y'] = convert_to_um(float(pixels.get('PhysicalSizeY')), pixels.get('PhysicalSizeYUnit'))
            if 'PhysicalSizeZ' in pixels:
                pixel_size['z'] = convert_to_um(float(pixels.get('PhysicalSizeZ')), pixels.get('PhysicalSizeZUnit'))
            plane = pixels.get('Plane')
            if plane:
                if 'PositionX' in plane:
                    position['x'] = convert_to_um(float(plane.get('PositionX')), plane.get('PositionXUnit'))
                if 'PositionY' in plane:
                    position['y'] = convert_to_um(float(plane.get('PositionY')), plane.get('PositionYUnit'))
                if 'PositionZ' in plane:
                    position['z'] = convert_to_um(float(plane.get('PositionZ')), plane.get('PositionZUnit'))
            for channel0 in ensure_list(pixels.get('Channel')):
                channel = {}
                if 'Name' in channel0:
                    channel['label'] = channel0['Name']
                if 'Color' in channel0:
                    channel['color'] = int_to_rgba(channel0['Color'])
                channels.append(channel)
        else:
            self.is_plate = False
            if self.is_imagej:
                self.imagej_metadata = self.tiff.imagej_metadata
                pixel_size_unit = self.imagej_metadata.get('unit', '').encode().decode('unicode_escape')
                if 'scales' in self.imagej_metadata:
                    for dim, scale in zip(['x', 'y'], self.imagej_metadata['scales'].split(',')):
                        scale = scale.strip()
                        if scale != '':
                            pixel_size[dim] = convert_to_um(float(scale), pixel_size_unit)
                if 'spacing' in self.imagej_metadata:
                    pixel_size['z'] = convert_to_um(self.imagej_metadata['spacing'], pixel_size_unit)
            self.metadata = tags_to_dict(self.tiff.pages.first.tags)
            self.name = os.path.splitext(self.tiff.filename)[0]
            self.dtype = page.dtype
            res_unit = self.metadata.get('ResolutionUnit', '')
            if isinstance(res_unit, Enum):
                res_unit = res_unit.name
            res_unit = res_unit.lower()
            if res_unit == 'none':
                res_unit = ''
            if 'x' not in pixel_size:
                res0 = convert_rational_value(self.metadata.get('XResolution'))
                if res0 is not None and res0 != 0:
                    pixel_size['x'] = convert_to_um(1 / res0, res_unit)
            if 'y' not in pixel_size:
                res0 = convert_rational_value(self.metadata.get('YResolution'))
                if res0 is not None and res0 != 0:
                    pixel_size['y'] = convert_to_um(1 / res0, res_unit)
        self.pixel_size = pixel_size
        self.position = position
        self.channels = channels
        return self.metadata

    def is_screen(self):
        return self.is_plate

    def get_shape(self):
        return self.shape

    def get_shapes(self):
        return self.shapes

    def get_scales(self):
        return self.scales

    def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
        if well_id is not None:
            index = self.image_refs[well_id][str(field_id)]
            tiff = TiffFile(self.image_filenames[index])
        else:
            tiff = self.tiff
        data = tiff.asarray(level=level)
        return redimension_data(data, self.dim_order, dim_order)

    def get_data_as_dask(self, dim_order, level=0, **kwargs):
        #lazy_array = dask.delayed(imread)(self.uri, level=level)
        #data = da.from_delayed(lazy_array, shape=self.shapes[level], dtype=self.dtype)
        data = da.from_zarr(imread(self.uri, level=level, aszarr=True))
        if data.chunksize == data.shape:
            data = data.rechunk(TILE_SIZE)
        return redimension_data(data, self.dim_order, dim_order)

    def get_name(self):
        """
        Gets the image or plate name.

        Returns:
            str: Name.
        """
        return self.name

    def get_dim_order(self):
        """
        Returns the dimension order string.

        Returns:
            str: Dimension order.
        """
        return self.dim_order

    def get_dtype(self):
        """
        Returns the numpy dtype of the image data.

        Returns:
            dtype: Numpy dtype.
        """
        return self.dtype

    def get_pixel_size_um(self):
        """
        Returns the pixel size in micrometers.

        Returns:
            dict: Pixel size for x, y, (and z).
        """
        if self.pixel_size:
            return self.pixel_size
        else:
            return {'x': 1, 'y': 1}

    def get_position_um(self, well_id=None):
        """
        Returns the position in micrometers.

        Returns:
            dict: Position in micrometers.
        """
        return self.position

    def get_channels(self):
        """
        Returns channel metadata.

        Returns:
            list: List of channel dicts.
        """
        return self.channels

    def get_nchannels(self):
        """
        Returns the number of channels.

        Returns:
            int: Number of channels.
        """
        return self.nchannels

    def is_rgb(self):
        """
        Check if the source is a RGB(A) image.
        """
        return self.is_photometric_rgb

    def get_rows(self):
        """
        Returns the list of row identifiers.

        Returns:
            list: Row identifiers.
        """
        return self.rows

    def get_columns(self):
        """
        Returns the list of column identifiers.

        Returns:
            list: Column identifiers.
        """
        return self.columns

    def get_wells(self):
        """
        Returns the list of well identifiers.

        Returns:
            list: Well identifiers.
        """
        return self.wells

    def get_time_points(self):
        """
        Returns the list of time points.

        Returns:
            list: Time point IDs.
        """
        nt = 1
        if 't' in self.dim_order:
            t_index = self.dim_order.index('t')
            nt = self.tiff.pages.first.shape[t_index]
        return list(range(nt))

    def get_fields(self):
        """
        Returns the list of field indices.

        Returns:
            list: Field indices.
        """
        return self.fields

    def get_acquisitions(self):
        """
        Returns acquisition metadata (empty for TIFF).

        Returns:
            list: Empty list.
        """
        return []

    def get_total_data_size(self):
        """
        Returns the estimated total data size.

        Returns:
            int: Total data size in bytes.
        """
        total_size = np.prod(self.shape)
        if self.is_plate:
            total_size *= len(self.get_wells()) * len(self.get_fields())
        return total_size

    def close(self):
        """
        Closes the TIFF file.
        """
        self.tiff.close()

image_filenames = {} instance-attribute

metadata = metadata_to_dict(file.read().decode()) instance-attribute

tiff = TiffFile(image_filename) instance-attribute

__init__(uri, metadata={})

Initialize TiffSource.

Parameters:

Name Type Description Default
uri str

Path to the TIFF file.

required
metadata dict

Optional metadata dictionary.

{}
Source code in src/TiffSource.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, uri, metadata={}):
    """
    Initialize TiffSource.

    Args:
        uri (str): Path to the TIFF file.
        metadata (dict): Optional metadata dictionary.
    """
    super().__init__(uri, metadata)
    image_filename = None
    ext = os.path.splitext(uri)[1].lower()
    if 'tif' in ext:
        image_filename = uri
    elif 'ome' in ext:
        # read metadata
        with open(uri, 'rb') as file:
            self.metadata = metadata_to_dict(file.read().decode())
        # try to open a ome-tiff file
        self.image_filenames = {}
        for image in ensure_list(self.metadata.get('Image', {})):
            filename = image.get('Pixels', {}).get('TiffData', {}).get('UUID', {}).get('FileName')
            if filename:
                filepath = os.path.join(os.path.dirname(uri), filename)
                self.image_filenames[image['ID']] = filepath
                if image_filename is None:
                    image_filename = filepath
    else:
        raise RuntimeError(f'Unsupported tiff extension: {ext}')

    self.tiff = TiffFile(image_filename)

close()

Closes the TIFF file.

Source code in src/TiffSource.py
344
345
346
347
348
def close(self):
    """
    Closes the TIFF file.
    """
    self.tiff.close()

get_acquisitions()

Returns acquisition metadata (empty for TIFF).

Returns:

Name Type Description
list

Empty list.

Source code in src/TiffSource.py
323
324
325
326
327
328
329
330
def get_acquisitions(self):
    """
    Returns acquisition metadata (empty for TIFF).

    Returns:
        list: Empty list.
    """
    return []

get_channels()

Returns channel metadata.

Returns:

Name Type Description
list

List of channel dicts.

Source code in src/TiffSource.py
250
251
252
253
254
255
256
257
def get_channels(self):
    """
    Returns channel metadata.

    Returns:
        list: List of channel dicts.
    """
    return self.channels

get_columns()

Returns the list of column identifiers.

Returns:

Name Type Description
list

Column identifiers.

Source code in src/TiffSource.py
283
284
285
286
287
288
289
290
def get_columns(self):
    """
    Returns the list of column identifiers.

    Returns:
        list: Column identifiers.
    """
    return self.columns

get_data(dim_order, level=0, well_id=None, field_id=None, **kwargs)

Source code in src/TiffSource.py
185
186
187
188
189
190
191
192
def get_data(self, dim_order, level=0, well_id=None, field_id=None, **kwargs):
    if well_id is not None:
        index = self.image_refs[well_id][str(field_id)]
        tiff = TiffFile(self.image_filenames[index])
    else:
        tiff = self.tiff
    data = tiff.asarray(level=level)
    return redimension_data(data, self.dim_order, dim_order)

get_data_as_dask(dim_order, level=0, **kwargs)

Source code in src/TiffSource.py
194
195
196
197
198
199
200
def get_data_as_dask(self, dim_order, level=0, **kwargs):
    #lazy_array = dask.delayed(imread)(self.uri, level=level)
    #data = da.from_delayed(lazy_array, shape=self.shapes[level], dtype=self.dtype)
    data = da.from_zarr(imread(self.uri, level=level, aszarr=True))
    if data.chunksize == data.shape:
        data = data.rechunk(TILE_SIZE)
    return redimension_data(data, self.dim_order, dim_order)

get_dim_order()

Returns the dimension order string.

Returns:

Name Type Description
str

Dimension order.

Source code in src/TiffSource.py
211
212
213
214
215
216
217
218
def get_dim_order(self):
    """
    Returns the dimension order string.

    Returns:
        str: Dimension order.
    """
    return self.dim_order

get_dtype()

Returns the numpy dtype of the image data.

Returns:

Name Type Description
dtype

Numpy dtype.

Source code in src/TiffSource.py
220
221
222
223
224
225
226
227
def get_dtype(self):
    """
    Returns the numpy dtype of the image data.

    Returns:
        dtype: Numpy dtype.
    """
    return self.dtype

get_fields()

Returns the list of field indices.

Returns:

Name Type Description
list

Field indices.

Source code in src/TiffSource.py
314
315
316
317
318
319
320
321
def get_fields(self):
    """
    Returns the list of field indices.

    Returns:
        list: Field indices.
    """
    return self.fields

get_name()

Gets the image or plate name.

Returns:

Name Type Description
str

Name.

Source code in src/TiffSource.py
202
203
204
205
206
207
208
209
def get_name(self):
    """
    Gets the image or plate name.

    Returns:
        str: Name.
    """
    return self.name

get_nchannels()

Returns the number of channels.

Returns:

Name Type Description
int

Number of channels.

Source code in src/TiffSource.py
259
260
261
262
263
264
265
266
def get_nchannels(self):
    """
    Returns the number of channels.

    Returns:
        int: Number of channels.
    """
    return self.nchannels

get_pixel_size_um()

Returns the pixel size in micrometers.

Returns:

Name Type Description
dict

Pixel size for x, y, (and z).

Source code in src/TiffSource.py
229
230
231
232
233
234
235
236
237
238
239
def get_pixel_size_um(self):
    """
    Returns the pixel size in micrometers.

    Returns:
        dict: Pixel size for x, y, (and z).
    """
    if self.pixel_size:
        return self.pixel_size
    else:
        return {'x': 1, 'y': 1}

get_position_um(well_id=None)

Returns the position in micrometers.

Returns:

Name Type Description
dict

Position in micrometers.

Source code in src/TiffSource.py
241
242
243
244
245
246
247
248
def get_position_um(self, well_id=None):
    """
    Returns the position in micrometers.

    Returns:
        dict: Position in micrometers.
    """
    return self.position

get_rows()

Returns the list of row identifiers.

Returns:

Name Type Description
list

Row identifiers.

Source code in src/TiffSource.py
274
275
276
277
278
279
280
281
def get_rows(self):
    """
    Returns the list of row identifiers.

    Returns:
        list: Row identifiers.
    """
    return self.rows

get_scales()

Source code in src/TiffSource.py
182
183
def get_scales(self):
    return self.scales

get_shape()

Source code in src/TiffSource.py
176
177
def get_shape(self):
    return self.shape

get_shapes()

Source code in src/TiffSource.py
179
180
def get_shapes(self):
    return self.shapes

get_time_points()

Returns the list of time points.

Returns:

Name Type Description
list

Time point IDs.

Source code in src/TiffSource.py
301
302
303
304
305
306
307
308
309
310
311
312
def get_time_points(self):
    """
    Returns the list of time points.

    Returns:
        list: Time point IDs.
    """
    nt = 1
    if 't' in self.dim_order:
        t_index = self.dim_order.index('t')
        nt = self.tiff.pages.first.shape[t_index]
    return list(range(nt))

get_total_data_size()

Returns the estimated total data size.

Returns:

Name Type Description
int

Total data size in bytes.

Source code in src/TiffSource.py
332
333
334
335
336
337
338
339
340
341
342
def get_total_data_size(self):
    """
    Returns the estimated total data size.

    Returns:
        int: Total data size in bytes.
    """
    total_size = np.prod(self.shape)
    if self.is_plate:
        total_size *= len(self.get_wells()) * len(self.get_fields())
    return total_size

get_wells()

Returns the list of well identifiers.

Returns:

Name Type Description
list

Well identifiers.

Source code in src/TiffSource.py
292
293
294
295
296
297
298
299
def get_wells(self):
    """
    Returns the list of well identifiers.

    Returns:
        list: Well identifiers.
    """
    return self.wells

init_metadata()

Initializes and loads metadata from the (OME) TIFF file.

Returns:

Name Type Description
dict

Metadata dictionary.

Source code in src/TiffSource.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def init_metadata(self):
    """
    Initializes and loads metadata from the (OME) TIFF file.

    Returns:
        dict: Metadata dictionary.
    """
    self.is_ome = self.tiff.is_ome
    self.is_imagej = self.tiff.is_imagej
    pixel_size = {}
    position = {}
    channels = []

    if self.tiff.series:
        pages = self.tiff.series
        page = pages[0]
    else:
        pages = self.tiff.pages
        page = self.tiff.pages.first
    if hasattr(page, 'levels'):
        pages = page.levels
    self.shapes = [page.shape for page in pages]
    self.shape = page.shape
    self.dim_order = page.axes.lower().replace('s', 'c').replace('r', '')
    x_index, y_index = self.dim_order.index('x'), self.dim_order.index('y')
    self.scales = [float(np.mean([shape[x_index] / self.shape[x_index], shape[y_index] / self.shape[y_index]]))
                   for shape in self.shapes]
    self.is_photometric_rgb = (self.tiff.pages.first.photometric == PHOTOMETRIC.RGB)
    self.nchannels = self.shape[self.dim_order.index('c')] if 'c' in self.dim_order else 1

    if self.is_ome:
        metadata = metadata_to_dict(self.tiff.ome_metadata)
        if metadata and not 'BinaryOnly' in metadata:
            self.metadata = metadata
        image0 = ensure_list(self.metadata.get('Image', []))[0]
        self.is_plate = 'Plate' in self.metadata
        if self.is_plate:
            plate = self.metadata['Plate']
            self.name = plate.get('Name')
            rows = set()
            columns = set()
            wells = {}
            fields = []
            image_refs = {}
            for well in ensure_list(plate['Well']):
                row = create_row_col_label(well['Row'], plate['RowNamingConvention'])
                column = create_row_col_label(well['Column'], plate['ColumnNamingConvention'])
                rows.add(row)
                columns.add(column)
                label = f'{row}{column}'
                wells[label] = well['ID']
                image_refs[label] = {}
                for sample in ensure_list(well.get('WellSample')):
                    index = sample.get('Index', 0)
                    image_refs[label][str(index)] = sample['ImageRef']['ID']
                    if index not in fields:
                        fields.append(index)
            self.rows = sorted(rows)
            self.columns = list(columns)
            self.wells = list(wells.keys())
            self.fields = fields
            self.image_refs = image_refs
        else:
            self.name = image0.get('Name')
        if not self.name:
            self.name = get_filetitle(self.uri)
        self.name = str(self.name).rstrip('.tiff').rstrip('.tif').rstrip('.ome')
        pixels = image0.get('Pixels', {})
        self.dtype = np.dtype(pixels['Type'])
        if 'PhysicalSizeX' in pixels:
            pixel_size['x'] = convert_to_um(float(pixels.get('PhysicalSizeX')), pixels.get('PhysicalSizeXUnit'))
        if 'PhysicalSizeY' in pixels:
            pixel_size['y'] = convert_to_um(float(pixels.get('PhysicalSizeY')), pixels.get('PhysicalSizeYUnit'))
        if 'PhysicalSizeZ' in pixels:
            pixel_size['z'] = convert_to_um(float(pixels.get('PhysicalSizeZ')), pixels.get('PhysicalSizeZUnit'))
        plane = pixels.get('Plane')
        if plane:
            if 'PositionX' in plane:
                position['x'] = convert_to_um(float(plane.get('PositionX')), plane.get('PositionXUnit'))
            if 'PositionY' in plane:
                position['y'] = convert_to_um(float(plane.get('PositionY')), plane.get('PositionYUnit'))
            if 'PositionZ' in plane:
                position['z'] = convert_to_um(float(plane.get('PositionZ')), plane.get('PositionZUnit'))
        for channel0 in ensure_list(pixels.get('Channel')):
            channel = {}
            if 'Name' in channel0:
                channel['label'] = channel0['Name']
            if 'Color' in channel0:
                channel['color'] = int_to_rgba(channel0['Color'])
            channels.append(channel)
    else:
        self.is_plate = False
        if self.is_imagej:
            self.imagej_metadata = self.tiff.imagej_metadata
            pixel_size_unit = self.imagej_metadata.get('unit', '').encode().decode('unicode_escape')
            if 'scales' in self.imagej_metadata:
                for dim, scale in zip(['x', 'y'], self.imagej_metadata['scales'].split(',')):
                    scale = scale.strip()
                    if scale != '':
                        pixel_size[dim] = convert_to_um(float(scale), pixel_size_unit)
            if 'spacing' in self.imagej_metadata:
                pixel_size['z'] = convert_to_um(self.imagej_metadata['spacing'], pixel_size_unit)
        self.metadata = tags_to_dict(self.tiff.pages.first.tags)
        self.name = os.path.splitext(self.tiff.filename)[0]
        self.dtype = page.dtype
        res_unit = self.metadata.get('ResolutionUnit', '')
        if isinstance(res_unit, Enum):
            res_unit = res_unit.name
        res_unit = res_unit.lower()
        if res_unit == 'none':
            res_unit = ''
        if 'x' not in pixel_size:
            res0 = convert_rational_value(self.metadata.get('XResolution'))
            if res0 is not None and res0 != 0:
                pixel_size['x'] = convert_to_um(1 / res0, res_unit)
        if 'y' not in pixel_size:
            res0 = convert_rational_value(self.metadata.get('YResolution'))
            if res0 is not None and res0 != 0:
                pixel_size['y'] = convert_to_um(1 / res0, res_unit)
    self.pixel_size = pixel_size
    self.position = position
    self.channels = channels
    return self.metadata

is_rgb()

Check if the source is a RGB(A) image.

Source code in src/TiffSource.py
268
269
270
271
272
def is_rgb(self):
    """
    Check if the source is a RGB(A) image.
    """
    return self.is_photometric_rgb

is_screen()

Source code in src/TiffSource.py
173
174
def is_screen(self):
    return self.is_plate

convert_rational_value(value)

Converts a rational value tuple to a float.

Parameters:

Name Type Description Default
value tuple or None

Rational value.

required

Returns:

Type Description

float or None: Converted value.

Source code in src/TiffSource.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def convert_rational_value(value):
    """
    Converts a rational value tuple to a float.

    Args:
        value (tuple or None): Rational value.

    Returns:
        float or None: Converted value.
    """
    if value is not None and isinstance(value, tuple):
        if value[0] == value[1]:
            value = value[0]
        else:
            value = value[0] / value[1]
    return value

tags_to_dict(tags)

Converts TIFF tags to a dictionary.

Parameters:

Name Type Description Default
tags

TIFF tags object.

required

Returns:

Name Type Description
dict

Tag name-value mapping.

Source code in src/TiffSource.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def tags_to_dict(tags):
    """
    Converts TIFF tags to a dictionary.

    Args:
        tags: TIFF tags object.

    Returns:
        dict: Tag name-value mapping.
    """
    tag_dict = {}
    for tag in tags.values():
        tag_dict[tag.name] = tag.value
    return tag_dict

Timer

Timer

Bases: object

Context manager for timing code execution and logging the elapsed time.

Source code in src/Timer.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Timer(object):
    """
    Context manager for timing code execution and logging the elapsed time.
    """

    def __init__(self, title, auto_unit=True, verbose=True):
        """
        Initialize the Timer.

        Args:
            title (str): Description for the timed block.
            auto_unit (bool): Automatically select time unit (seconds/minutes/hours).
            verbose (bool): If True, log the elapsed time.
        """
        self.title = title
        self.auto_unit = auto_unit
        self.verbose = verbose

    def __enter__(self):
        """
        Start timing.
        """
        self.ptime_start = time.process_time()
        self.time_start = time.time()

    def __exit__(self, type, value, traceback):
        """
        Stop timing and log the elapsed time.

        Args:
            type: Exception type, if any.
            value: Exception value, if any.
            traceback: Exception traceback, if any.
        """
        if self.verbose:
            ptime_end = time.process_time()
            time_end = time.time()
            pelapsed = ptime_end - self.ptime_start
            elapsed = time_end - self.time_start
            unit = 'seconds'
            if self.auto_unit and elapsed >= 60:
                pelapsed /= 60
                elapsed /= 60
                unit = 'minutes'
                if elapsed >= 60:
                    pelapsed /= 60
                    elapsed /= 60
                    unit = 'hours'
            logging.info(f'Time {self.title}: {elapsed:.1f} ({pelapsed:.1f}) {unit}')

auto_unit = auto_unit instance-attribute

title = title instance-attribute

verbose = verbose instance-attribute

__enter__()

Start timing.

Source code in src/Timer.py
25
26
27
28
29
30
def __enter__(self):
    """
    Start timing.
    """
    self.ptime_start = time.process_time()
    self.time_start = time.time()

__exit__(type, value, traceback)

Stop timing and log the elapsed time.

Parameters:

Name Type Description Default
type

Exception type, if any.

required
value

Exception value, if any.

required
traceback

Exception traceback, if any.

required
Source code in src/Timer.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __exit__(self, type, value, traceback):
    """
    Stop timing and log the elapsed time.

    Args:
        type: Exception type, if any.
        value: Exception value, if any.
        traceback: Exception traceback, if any.
    """
    if self.verbose:
        ptime_end = time.process_time()
        time_end = time.time()
        pelapsed = ptime_end - self.ptime_start
        elapsed = time_end - self.time_start
        unit = 'seconds'
        if self.auto_unit and elapsed >= 60:
            pelapsed /= 60
            elapsed /= 60
            unit = 'minutes'
            if elapsed >= 60:
                pelapsed /= 60
                elapsed /= 60
                unit = 'hours'
        logging.info(f'Time {self.title}: {elapsed:.1f} ({pelapsed:.1f}) {unit}')

__init__(title, auto_unit=True, verbose=True)

Initialize the Timer.

Parameters:

Name Type Description Default
title str

Description for the timed block.

required
auto_unit bool

Automatically select time unit (seconds/minutes/hours).

True
verbose bool

If True, log the elapsed time.

True
Source code in src/Timer.py
12
13
14
15
16
17
18
19
20
21
22
23
def __init__(self, title, auto_unit=True, verbose=True):
    """
    Initialize the Timer.

    Args:
        title (str): Description for the timed block.
        auto_unit (bool): Automatically select time unit (seconds/minutes/hours).
        verbose (bool): If True, log the elapsed time.
    """
    self.title = title
    self.auto_unit = auto_unit
    self.verbose = verbose

WindowScanner

WindowScanner

Computes quantile-based min/max window for image channels.

Source code in src/WindowScanner.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class WindowScanner:
    """
    Computes quantile-based min/max window for image channels.
    """

    def __init__(self):
        """
        Initialize WindowScanner.
        """
        self.mins = []
        self.maxs = []

    def process(self, data, dim_order, min_quantile=0.01, max_quantile=0.99):
        """
        Processes image data to compute min/max quantiles for each channel.

        Args:
            data (ndarray): Image data.
            dim_order (str): Dimension order string.
            min_quantile (float): Lower quantile.
            max_quantile (float): Upper quantile.
        """
        axis = []
        if 't' in dim_order:
            axis += [dim_order.index('t')]
        if 'z' in dim_order:
            axis += [dim_order.index('z')]
        axis += [dim_order.index('y'), dim_order.index('x')]
        values = np.quantile(data, axis=axis, q=[min_quantile, max_quantile])
        mins, maxs = values
        if len(self.mins) == 0:
            self.mins = mins
            self.maxs = maxs
        else:
            self.mins = np.min([mins, self.mins], axis=0)
            self.maxs = np.max([maxs, self.maxs], axis=0)

    def get_window(self):
        """
        Returns the computed min/max window for channels.

        Returns:
            tuple: (min dict, max dict)
        """
        return np.array(self.mins).tolist(), np.array(self.maxs).tolist()

maxs = [] instance-attribute

mins = [] instance-attribute

__init__()

Initialize WindowScanner.

Source code in src/WindowScanner.py
 9
10
11
12
13
14
def __init__(self):
    """
    Initialize WindowScanner.
    """
    self.mins = []
    self.maxs = []

get_window()

Returns the computed min/max window for channels.

Returns:

Name Type Description
tuple

(min dict, max dict)

Source code in src/WindowScanner.py
41
42
43
44
45
46
47
48
def get_window(self):
    """
    Returns the computed min/max window for channels.

    Returns:
        tuple: (min dict, max dict)
    """
    return np.array(self.mins).tolist(), np.array(self.maxs).tolist()

process(data, dim_order, min_quantile=0.01, max_quantile=0.99)

Processes image data to compute min/max quantiles for each channel.

Parameters:

Name Type Description Default
data ndarray

Image data.

required
dim_order str

Dimension order string.

required
min_quantile float

Lower quantile.

0.01
max_quantile float

Upper quantile.

0.99
Source code in src/WindowScanner.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def process(self, data, dim_order, min_quantile=0.01, max_quantile=0.99):
    """
    Processes image data to compute min/max quantiles for each channel.

    Args:
        data (ndarray): Image data.
        dim_order (str): Dimension order string.
        min_quantile (float): Lower quantile.
        max_quantile (float): Upper quantile.
    """
    axis = []
    if 't' in dim_order:
        axis += [dim_order.index('t')]
    if 'z' in dim_order:
        axis += [dim_order.index('z')]
    axis += [dim_order.index('y'), dim_order.index('x')]
    values = np.quantile(data, axis=axis, q=[min_quantile, max_quantile])
    mins, maxs = values
    if len(self.mins) == 0:
        self.mins = mins
        self.maxs = maxs
    else:
        self.mins = np.min([mins, self.mins], axis=0)
        self.maxs = np.max([maxs, self.maxs], axis=0)

color_conversion

hexrgb_to_rgba(hexrgb)

Source code in src/color_conversion.py
21
22
23
24
25
26
def hexrgb_to_rgba(hexrgb: str) -> list:
    hexrgb = hexrgb.lstrip('#')
    if len(hexrgb) == 6:
        hexrgb += 'FF'  # add alpha
    rgba = int_to_rgba(eval('0x' + hexrgb))
    return rgba

int_to_rgba(intrgba)

Source code in src/color_conversion.py
3
4
5
6
7
8
def int_to_rgba(intrgba: int) -> list:
    signed = (intrgba < 0)
    rgba = [x / 255 for x in intrgba.to_bytes(4, signed=signed, byteorder="big")]
    if rgba[-1] == 0:
        rgba[-1] = 1
    return rgba

rgba_to_hexrgb(rgba)

Source code in src/color_conversion.py
16
17
18
def rgba_to_hexrgb(rgba: list) -> str:
    hexrgb = ''.join([hex(int(x * 255))[2:].upper().zfill(2) for x in rgba[:3]])
    return hexrgb

rgba_to_int(rgba)

Source code in src/color_conversion.py
11
12
13
def rgba_to_int(rgba: list) -> int:
    intrgba = int.from_bytes([int(x * 255) for x in rgba], signed=True, byteorder="big")
    return intrgba

helper

create_incucyte_source(filename, plate_id=None)

Create an IncucyteSource object for a specific plate.

Parameters:

Name Type Description Default
filename str

Path to the Incucyte archive folder or .icarch file.

required
plate_id str

Specific plate ID to process. If None, uses the first available plate if multiple plates exist.

None

Returns:

Name Type Description
IncucyteSource

Source object for the specified plate.

Raises:

Type Description
ValueError

If the path is not a valid Incucyte archive.

Source code in src/helper.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def create_incucyte_source(filename, plate_id=None):
    """
    Create an IncucyteSource object for a specific plate.

    Args:
        filename (str): Path to the Incucyte archive folder or .icarch file.
        plate_id (str, optional): Specific plate ID to process. If None,
                                 uses the first available plate if multiple
                                 plates exist.

    Returns:
        IncucyteSource: Source object for the specified plate.

    Raises:
        ValueError: If the path is not a valid Incucyte archive.
    """
    # If it's an .icarch file, use its parent folder
    if os.path.isfile(filename) and filename.lower().endswith('.icarch'):
        archive_folder = os.path.dirname(filename)
    elif os.path.isdir(filename):
        archive_folder = filename
    else:
        raise ValueError(
            f'Invalid Incucyte archive path. Expected folder or .icarch '
            f'file: {filename}'
        )

    from src.IncucyteSource import IncucyteSource
    return IncucyteSource(archive_folder, plate_id=plate_id)

create_source(filename, **kwargs)

Create an image source object based on the input file extension.

Parameters:

Name Type Description Default
filename str

Path to the input file or Incucyte .icarch file.

required
**kwargs

Source-specific parameters (e.g., plate_id for Incucyte).

{}

Returns:

Name Type Description
ImageSource

Source object for the input file.

Raises:

Type Description
ValueError

If the file format is unsupported.

Source code in src/helper.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def create_source(filename, **kwargs):
    """
    Create an image source object based on the input file extension.

    Args:
        filename (str): Path to the input file or Incucyte .icarch file.
        **kwargs: Source-specific parameters (e.g., plate_id for Incucyte).

    Returns:
        ImageSource: Source object for the input file.

    Raises:
        ValueError: If the file format is unsupported.
    """
    input_ext = os.path.splitext(filename)[1].lower()

    if input_ext == '.db':
        from src.ImageDbSource import ImageDbSource
        source = ImageDbSource(filename)
    elif input_ext == '.icarch':
        # Incucyte archive file - use parent folder for source
        if not os.path.isfile(filename):
            raise ValueError(
                f'Incucyte archive file not found: {filename}'
            )
        archive_folder = os.path.dirname(filename)
        # Verify EssenFiles folder exists
        essen_path = os.path.join(archive_folder, 'EssenFiles')
        if not os.path.isdir(essen_path):
            raise ValueError(
                f'EssenFiles folder not found in: {archive_folder}. '
                f'Expected Incucyte archive structure.'
            )
        from src.IncucyteSource import IncucyteSource
        # Pass kwargs to IncucyteSource (e.g., plate_id)
        source = IncucyteSource(archive_folder, **kwargs)
    elif input_ext == '.isyntax':
        from src.ISyntaxSource import ISyntaxSource
        source = ISyntaxSource(filename)
    elif input_ext == '.mrxs':
        from src.MiraxSource import MiraxSource
        source = MiraxSource(filename)
    elif '.zar' in input_ext:
        from src.OmeZarrSource import OmeZarrSource
        source = OmeZarrSource(filename)
    elif '.tif' in input_ext or input_ext == '.ome':
        from src.TiffSource import TiffSource
        source = TiffSource(filename)
    else:
        raise ValueError(f'Unsupported input file format: {input_ext}')
    return source

create_writer(output_format, verbose=False)

Create a writer object and output extension based on the output format.

Parameters:

Name Type Description Default
output_format str

Output format string.

required
verbose bool

If True, enables verbose output.

False

Returns:

Name Type Description
tuple

(writer object, output file extension)

Raises:

Type Description
ValueError

If the output format is unsupported.

Source code in src/helper.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def create_writer(output_format, verbose=False):
    """
    Create a writer object and output extension based on the output format.

    Args:
        output_format (str): Output format string.
        verbose (bool): If True, enables verbose output.

    Returns:
        tuple: (writer object, output file extension)

    Raises:
        ValueError: If the output format is unsupported.
    """
    if 'zar' in output_format:
        if '3' in output_format:
            zarr_version = 3
            ome_version = '0.5'
        else:
            zarr_version = 2
            ome_version = '0.4'
        from src.OmeZarrWriter import OmeZarrWriter
        writer = OmeZarrWriter(zarr_version=zarr_version, ome_version=ome_version, verbose=verbose)
        ext = '.ome.zarr'
    elif 'tif' in output_format:
        from src.OmeTiffWriter import OmeTiffWriter
        writer = OmeTiffWriter(verbose=verbose)
        ext = '.ome.tiff'
    else:
        raise ValueError(f'Unsupported output format: {output_format}')
    return writer, ext

get_incucyte_plates(filename)

Get all available plate IDs from an Incucyte archive.

Parameters:

Name Type Description Default
filename str

Path to the Incucyte archive folder or .icarch file.

required

Returns:

Name Type Description
list

List of plate IDs (strings) found in the archive.

Raises:

Type Description
ValueError

If the path is not a valid Incucyte archive.

Source code in src/helper.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def get_incucyte_plates(filename):
    """
    Get all available plate IDs from an Incucyte archive.

    Args:
        filename (str): Path to the Incucyte archive folder or .icarch file.

    Returns:
        list: List of plate IDs (strings) found in the archive.

    Raises:
        ValueError: If the path is not a valid Incucyte archive.
    """
    # If it's an .icarch file, use its parent folder
    if os.path.isfile(filename) and filename.lower().endswith('.icarch'):
        archive_folder = os.path.dirname(filename)
    elif os.path.isdir(filename):
        archive_folder = filename
    else:
        raise ValueError(
            f'Invalid Incucyte archive path. Expected folder or .icarch '
            f'file: {filename}'
        )

    from src.IncucyteSource import IncucyteSource
    return IncucyteSource.get_available_plates(archive_folder)

ome_tiff_util

create_binaryonly_metadata(metadata_filename, companion_uuid)

Source code in src/ome_tiff_util.py
132
133
134
135
136
137
def create_binaryonly_metadata(metadata_filename, companion_uuid):
    ome = OME()
    ome.uuid = create_uuid()
    ome.creator = f'nl.biomero.OmeTiffWriter {VERSION}'
    ome.binary_only = OME.BinaryOnly(metadata_file=metadata_filename, uuid=companion_uuid)
    return to_xml(ome), ome.uuid

create_image_metadata(source, image_name, dim_order='tczyx', image_uuid=None, image_filename=None)

Source code in src/ome_tiff_util.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def create_image_metadata(source, image_name, dim_order='tczyx', image_uuid=None, image_filename=None):
    t, c, z, y, x = [source.get_shape()[source.dim_order.index(dim)] if dim in source.get_dim_order() else 1
                     for dim in 'tczyx']
    pixel_size = source.get_pixel_size_um()
    channels = source.get_channels()
    if source.is_rgb():
        ome_channels = [Channel(name='rgb', samples_per_pixel=3)]
    elif len(channels) < c:
        ome_channels = [Channel(name=f'{channeli}', samples_per_pixel=1) for channeli in range(c)]
    else:
        ome_channels = []
        for channeli, channel in enumerate(channels):
            ome_channel = Channel()
            ome_channel.name = channel.get('label', channel.get('Name', f'{channeli}'))
            ome_channel.samples_per_pixel = 1
            color = channel.get('color', channel.get('Color'))
            if color is not None:
                ome_channel.color = Color(rgba_to_int(color))
            ome_channels.append(ome_channel)

    tiff_data = TiffData()
    tiff_data.uuid = TiffData.UUID(value=image_uuid, file_name=image_filename)

    pixels = Pixels(
        dimension_order=Pixels_DimensionOrder(dim_order[::-1].upper()),
        type=PixelType(str(source.get_dtype())),
        channels=ome_channels,
        size_t=t, size_c=c, size_z=z, size_y=y, size_x=x,
        tiff_data_blocks=[tiff_data]
    )
    if 'x' in pixel_size:
        pixels.physical_size_x = pixel_size['x']
        pixels.physical_size_x_unit = UnitsLength.MICROMETER
    if 'y' in pixel_size:
        pixels.physical_size_y = pixel_size['y']
        pixels.physical_size_y_unit = UnitsLength.MICROMETER
    if 'z' in pixel_size:
        pixels.physical_size_z = pixel_size['z']
        pixels.physical_size_z_unit = UnitsLength.MICROMETER

    image = Image(name=image_name, pixels=pixels)
    index = pixels.id.split(':')[1]
    for channeli, channel in enumerate(pixels.channels):
        channel.id = f'Channel:{index}:{channeli}'
    return image

create_metadata(source, dim_order='tczyx', uuid=None, image_uuids=None, image_filenames=None, wells=None)

Source code in src/ome_tiff_util.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def create_metadata(source, dim_order='tczyx', uuid=None, image_uuids=None, image_filenames=None, wells=None):
    ome = OME()
    if uuid is None:
        uuid = create_uuid()
    ome.uuid = uuid
    ome.creator = f'nl.biomero.OmeTiffWriter {VERSION}'

    if source.is_screen():
        if wells is None:
            wells = source.get_wells()

        nrows, row_type = get_row_col_len_type(source.get_rows())
        ncols, col_type = get_row_col_len_type(source.get_columns())

        plate = Plate()
        plate.name = source.get_name()
        plate.rows = nrows
        plate.columns = ncols
        plate.row_naming_convention = row_type
        plate.column_naming_convention = col_type

        image_index = 0
        for well_id in wells:
            row, col = split_well_name(well_id)
            row_index = get_row_col_index(row)
            col_index = get_row_col_index(col)
            well = Well(row=row_index, column=col_index)
            well.id = f'Well:{row_index}:{col_index}'
            for field in source.get_fields():
                sample = WellSample(index=image_index)
                sample.id = f'WellSample:{row_index}:{col_index}:{field}'
                position = source.get_position_um(well_id)
                if 'x' in position:
                    sample.position_x = position['x']
                    sample.position_x_unit = UnitsLength.MICROMETER
                if 'y' in position:
                    sample.position_y = position['y']
                    sample.position_y_unit = UnitsLength.MICROMETER

                image_name = f'Well {well_id}, Field #{int(field) + 1}'
                image = create_image_metadata(source,
                                              image_name,
                                              dim_order,
                                              image_uuids[image_index],
                                              image_filenames[image_index])
                ome.images.append(image)

                image_ref = ImageRef(id=image.id)   # assign id at instantiation to avoid auto sequence increment
                sample.image_ref = image_ref
                well.well_samples.append(sample)

                image_index += 1

            plate.wells.append(well)

        ome.plates = [plate]
    else:
        ome.images = [create_image_metadata(source, source.get_name(), dim_order, ome.uuid, image_filenames[0])]

    return to_xml(ome)

create_resolution_metadata(source)

Source code in src/ome_tiff_util.py
167
168
169
170
171
def create_resolution_metadata(source):
    pixel_size_um = source.get_pixel_size_um()
    resolution_unit = 'CENTIMETER'
    resolution = [1e4 / pixel_size_um[dim] for dim in 'xy']
    return resolution, resolution_unit

create_row_col_label(index, naming_convention)

Source code in src/ome_tiff_util.py
159
160
161
162
163
164
def create_row_col_label(index, naming_convention):
    if naming_convention.lower() == NamingConvention.LETTER.name.lower():
        label = chr(ord('A') + index)
    else:
        label = index + 1
    return str(label)

create_uuid()

Source code in src/ome_tiff_util.py
19
20
def create_uuid():
    return f'urn:uuid:{uuid.uuid4()}'

get_row_col_index(label)

Source code in src/ome_tiff_util.py
151
152
153
154
155
156
def get_row_col_index(label):
    if label.isdigit():
        index = int(label) - 1
    else:
        index = ord(label.upper()) - ord('A')
    return index

get_row_col_len_type(labels)

Source code in src/ome_tiff_util.py
140
141
142
143
144
145
146
147
148
def get_row_col_len_type(labels):
    max_index = max(get_row_col_index(label) for label in labels)
    nlen = max_index + 1
    is_digits = [label.isdigit() for label in labels]
    if np.all(is_digits):
        naming_convention = NamingConvention.NUMBER
    else:
        naming_convention = NamingConvention.LETTER
    return nlen, naming_convention

metadata_to_dict(xml_metadata)

Source code in src/ome_tiff_util.py
12
13
14
15
16
def metadata_to_dict(xml_metadata):
    metadata = xml2dict(xml_metadata)
    if 'OME' in metadata:
        metadata = metadata['OME']
    return metadata

ome_zarr_util

create_axes_metadata(dimension_order)

Create axes metadata for OME-Zarr from dimension order.

Parameters:

Name Type Description Default
dimension_order str

String of dimension characters.

required

Returns:

Name Type Description
list

List of axis metadata dictionaries.

Source code in src/ome_zarr_util.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def create_axes_metadata(dimension_order):
    """
    Create axes metadata for OME-Zarr from dimension order.

    Args:
        dimension_order (str): String of dimension characters.

    Returns:
        list: List of axis metadata dictionaries.
    """
    axes = []
    for dimension in dimension_order:
        unit1 = None
        if dimension == 't':
            type1 = 'time'
            unit1 = 'millisecond'
        elif dimension == 'c':
            type1 = 'channel'
        else:
            type1 = 'space'
            unit1 = 'micrometer'
        axis = {'name': dimension, 'type': type1}
        if unit1 is not None and unit1 != '':
            axis['unit'] = unit1
        axes.append(axis)
    return axes

create_channel_metadata(dtype, channels, nchannels, is_rgb, window, ome_version)

Create channel metadata for OME-Zarr.

Parameters:

Name Type Description Default
dtype

Numpy dtype of image data.

required
channels list

List of channel dicts.

required
nchannels int

Number of channels.

required
window tuple

Min/max window values.

required
ome_version str

OME-Zarr version.

required

Returns:

Name Type Description
dict

Channel metadata dictionary.

Source code in src/ome_zarr_util.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def create_channel_metadata(dtype, channels, nchannels, is_rgb, window, ome_version):
    """
    Create channel metadata for OME-Zarr.

    Args:
        dtype: Numpy dtype of image data.
        channels (list): List of channel dicts.
        nchannels (int): Number of channels.
        window (tuple): Min/max window values.
        ome_version (str): OME-Zarr version.

    Returns:
        dict: Channel metadata dictionary.
    """
    if len(channels) < nchannels:
        labels = []
        colors = []
        if is_rgb and nchannels in (3, 4):
            labels = ['Red', 'Green', 'Blue']
            colors = [(1, 0, 0), (0, 1, 0), (0, 0, 1)]
        if is_rgb and nchannels == 4:
            labels += ['Alpha']
            colors += [(1, 1, 1)]
        channels = [{'label': label, 'color': color} for label, color in zip(labels, colors)]

    omezarr_channels = []
    starts, ends = window
    for channeli, channel in enumerate(channels):
        omezarr_channel = {'label': channel.get('label', channel.get('Name', f'{channeli}')), 'active': True}
        color = channel.get('color', channel.get('Color'))
        if color is not None:
            omezarr_channel['color'] = rgba_to_hexrgb(color)
        if dtype.kind == 'f':
            min, max = 0, 1
        else:
            info = np.iinfo(dtype)
            min, max = info.min, info.max
        if starts and ends:
            start, end = starts[channeli], ends[channeli]
        else:
            start, end = min, max
        omezarr_channel['window'] = {'min': min, 'max': max, 'start': start, 'end': end}
        omezarr_channels.append(omezarr_channel)

    metadata = {
        'version': ome_version,
        'channels': omezarr_channels,
    }
    return metadata

create_transformation_metadata(dimension_order, pixel_size_um, scale, translation_um=None)

Create transformation metadata (scale and translation) for OME-Zarr.

Parameters:

Name Type Description Default
dimension_order str

String of dimension characters.

required
pixel_size_um dict

Pixel size in micrometers per dimension.

required
scale float

Scaling factor.

required
translation_um dict

Translation in micrometers per dimension.

None

Returns:

Name Type Description
list

List of transformation metadata dictionaries.

Source code in src/ome_zarr_util.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def create_transformation_metadata(dimension_order, pixel_size_um, scale, translation_um=None):
    """
    Create transformation metadata (scale and translation) for OME-Zarr.

    Args:
        dimension_order (str): String of dimension characters.
        pixel_size_um (dict): Pixel size in micrometers per dimension.
        scale (float): Scaling factor.
        translation_um (dict, optional): Translation in micrometers per dimension.

    Returns:
        list: List of transformation metadata dictionaries.
    """
    metadata = []
    pixel_size_scale = []
    translation_scale = []
    for dimension in dimension_order:
        pixel_size_scale1 = pixel_size_um.get(dimension, 1)
        if pixel_size_scale1 == 0:
            pixel_size_scale1 = 1
        if dimension in ['x', 'y']:
            pixel_size_scale1 /= scale
        pixel_size_scale.append(pixel_size_scale1)

        if translation_um is not None:
            translation1 = translation_um.get(dimension, 0)
            if dimension in ['x', 'y']:
                translation1 *= scale
            translation_scale.append(translation1)

    metadata.append({'type': 'scale', 'scale': pixel_size_scale})
    if translation_um is not None:
        metadata.append({'type': 'translation', 'translation': translation_scale})
    return metadata

scale_dimensions_dict(shape0, scale)

Scale x and y dimensions in a shape dictionary.

Parameters:

Name Type Description Default
shape0 dict

Original shape dictionary.

required
scale float

Scaling factor.

required

Returns:

Name Type Description
dict

Scaled shape dictionary.

Source code in src/ome_zarr_util.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def scale_dimensions_dict(shape0, scale):
    """
    Scale x and y dimensions in a shape dictionary.

    Args:
        shape0 (dict): Original shape dictionary.
        scale (float): Scaling factor.

    Returns:
        dict: Scaled shape dictionary.
    """
    shape = {}
    if scale == 1:
        return shape0
    for dimension, shape1 in shape0.items():
        if dimension[0] in ['x', 'y']:
            shape1 = int(shape1 * scale)
        shape[dimension] = shape1
    return shape

scale_dimensions_xy(shape0, dimension_order, scale)

Scale x and y dimensions in a shape tuple.

Parameters:

Name Type Description Default
shape0 tuple

Original shape.

required
dimension_order str

String of dimension characters.

required
scale float

Scaling factor.

required

Returns:

Name Type Description
list

Scaled shape.

Source code in src/ome_zarr_util.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def scale_dimensions_xy(shape0, dimension_order, scale):
    """
    Scale x and y dimensions in a shape tuple.

    Args:
        shape0 (tuple): Original shape.
        dimension_order (str): String of dimension characters.
        scale (float): Scaling factor.

    Returns:
        list: Scaled shape.
    """
    shape = []
    if scale == 1:
        return shape0
    for shape1, dimension in zip(shape0, dimension_order):
        if dimension[0] in ['x', 'y']:
            shape1 = int(shape1 * scale)
        shape.append(shape1)
    return shape

parameters

PYRAMID_DOWNSCALE = 2 module-attribute

PYRAMID_LEVELS = 6 module-attribute

RETRY_ATTEMPTS = 3 module-attribute

TIFF_COMPRESSION = 'LZW' module-attribute

TILE_SIZE = 1024 module-attribute

VERSION = 'v0.1.10' module-attribute

ZARR_CHUNK_SIZE = TILE_SIZE module-attribute

ZARR_SHARD_MULTIPLIER = 10 module-attribute

util

convert_dotnet_ticks_to_datetime(net_ticks)

Source code in src/util.py
137
138
def convert_dotnet_ticks_to_datetime(net_ticks):
    return datetime(1, 1, 1) + timedelta(microseconds=net_ticks // 10)

convert_to_um(value, unit)

Source code in src/util.py
184
185
186
187
188
189
190
191
192
def convert_to_um(value, unit):
    conversions = {
        'nm': 1e-3,
        'µm': 1, 'um': 1, 'micrometer': 1, 'micron': 1,
        'mm': 1e3, 'millimeter': 1e3,
        'cm': 1e4, 'centimeter': 1e4,
        'm': 1e6, 'meter': 1e6
    }
    return value * conversions.get(unit, 1)

ensure_list(item)

Source code in src/util.py
 7
 8
 9
10
def ensure_list(item):
    if not isinstance(item, (list, tuple)):
        item = [item]
    return item

get_filetitle(filename)

Source code in src/util.py
68
69
def get_filetitle(filename):
    return os.path.basename(os.path.splitext(filename)[0])

get_level_from_scale(source_scales, target_scale=1)

Source code in src/util.py
54
55
56
57
58
59
60
61
def get_level_from_scale(source_scales, target_scale=1):
    best_level_scale = 0, target_scale
    for level, scale in enumerate(source_scales):
        if np.isclose(scale, target_scale, rtol=1e-4):
            return level, 1
        if scale <= target_scale:
            best_level_scale = level, target_scale / scale
    return best_level_scale

get_numpy_data(data, dim_order, t, c, z, y, x, y_size, x_size)

Source code in src/util.py
39
40
41
42
43
44
45
46
47
48
49
50
51
def get_numpy_data(data, dim_order, t, c, z, y, x, y_size, x_size):
    x_index = dim_order.index('x')
    y_index = dim_order.index('y')
    slices = [slice(None)] * len(dim_order)
    if 't' in dim_order:
        slices[dim_order.index('t')] = t
    if 'c' in dim_order:
        slices[dim_order.index('c')] = c
    if 'z' in dim_order:
        slices[dim_order.index('z')] = z
    slices[y_index] = slice(y, y + y_size)
    slices[x_index] = slice(x, x + x_size)
    return data[tuple(slices)]

get_rows_cols_plate(nwells)

Source code in src/util.py
122
123
124
125
126
127
128
129
130
131
132
133
134
def get_rows_cols_plate(nwells):
    nrows_cols = {
        6: (2, 3),
        12: (3, 4),
        24: (4, 6),
        48: (6, 8),
        96: (8, 12),
        384: (16, 24)
    }
    nrows, ncols = nrows_cols[nwells]
    rows = [chr(ord('A') + i) for i in range(nrows)]
    cols = [str(i + 1) for i in range(ncols)]
    return rows, cols

pad_leading_zero(input_string, num_digits=2)

Source code in src/util.py
104
105
106
107
108
109
110
111
112
113
114
def pad_leading_zero(input_string, num_digits=2):
    output = str(input_string)
    is_well = not output[0].isdigit()
    if is_well:
        row, col = split_well_name(output, remove_leading_zeros=True)
        output = str(col)
    while len(output) < num_digits:
        output = '0' + output
    if is_well:
        output = row + output
    return output

print_dict(value, tab=0, max_len=250, bullet=False)

Source code in src/util.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def print_dict(value, tab=0, max_len=250, bullet=False):
    s = ''
    if isinstance(value, dict):
        for key, subvalue in value.items():
            s += '\n'
            if bullet:
                s += '-'
                bullet = False
            s += '\t' * tab + str(key) + ': '
            if isinstance(subvalue, dict):
                s += print_dict(subvalue, tab+1)
            elif isinstance(subvalue, list):
                for v in subvalue:
                    s += print_dict(v, tab+1, bullet=True)
            else:
                subvalue = str(subvalue)
                if len(subvalue) > max_len:
                    subvalue = subvalue[:max_len] + '...'
                s += subvalue
    else:
        s += str(value) + ' '
    return s

print_hbytes(nbytes)

Source code in src/util.py
219
220
221
222
223
224
225
226
227
228
229
230
def print_hbytes(nbytes):
    exps = ['', 'K', 'M', 'G', 'T', 'P', 'E']
    div = 1024
    exp = 0
    while nbytes > div:
        nbytes /= div
        exp += 1
    if exp < len(exps):
        e = exps[exp]
    else:
        e = f'e{exp * 3}'
    return f'{nbytes:.1f}{e}B'

redimension_data(data, old_order, new_order, **indices)

Source code in src/util.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def redimension_data(data, old_order, new_order, **indices):
    # able to provide optional dimension values e.g. t=0, z=0
    if new_order == old_order:
        return data

    new_data = data
    order = old_order
    # remove
    for o in old_order:
        if o not in new_order:
            index = order.index(o)
            dim_value = indices.get(o, 0)
            new_data = np.take(new_data, indices=dim_value, axis=index)
            order = order[:index] + order[index + 1:]
    # add
    for o in new_order:
        if o not in order:
            new_data = np.expand_dims(new_data, 0)
            order = o + order
    # move
    old_indices = [order.index(o) for o in new_order]
    new_indices = list(range(len(new_order)))
    new_data = np.moveaxis(new_data, old_indices, new_indices)
    return new_data

split_well_name(well_name, remove_leading_zeros=True, col_as_int=False)

Source code in src/util.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def split_well_name(well_name, remove_leading_zeros=True, col_as_int=False):
    matches = re.findall(r'(\D+)(\d+)', well_name)
    if len(matches) > 0:
        row, col = matches[0]
        if col_as_int or remove_leading_zeros:
            try:
                col = int(col)
            except ValueError:
                pass
        if not col_as_int:
            col = str(col)
        return row, col
    else:
        raise ValueError(f"Invalid well name format: {well_name}. Expected format like 'A1', 'B2', etc.")

splitall(path)

Source code in src/util.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def splitall(path):
    allparts = []
    while True:
        parts = os.path.split(path)
        if parts[0] == path:  # sentinel for absolute paths
            allparts.insert(0, parts[0])
            break
        elif parts[1] == path: # sentinel for relative paths
            allparts.insert(0, parts[1])
            break
        else:
            path = parts[0]
            allparts.insert(0, parts[1])
    return allparts

strip_leading_zeros(well_name)

Source code in src/util.py
117
118
119
def strip_leading_zeros(well_name):
    row, col = split_well_name(well_name, remove_leading_zeros=True)
    return f'{row}{col}'

validate_filename(filename)

Source code in src/util.py
64
65
def validate_filename(filename):
    return re.sub(r'[^\w_.)(-]', '_', filename)

xml_content_to_dict(element)

Source code in src/util.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def xml_content_to_dict(element):
    key = element.tag
    children = list(element)
    if key == 'Array':
        res = [xml_content_to_dict(child) for child in children]
        return res
    if len(children) > 0:
        if children[0].tag == 'Array':
            value = []
        else:
            value = {}
        for child in children:
            child_value = xml_content_to_dict(child)
            if isinstance(child_value, list):
                value.extend(child_value)
            else:
                value |= child_value
    else:
        value = element.text
        if value is not None:
            if '"' in value:
                value = value.replace('"', '')
            else:
                for t in (float, int, bool):
                    try:
                        if t == bool:
                            if value.lower() == 'true':
                                value = True
                            if value.lower() == 'false':
                                value = False
                        else:
                            value = t(value)
                        break
                    except (TypeError, ValueError):
                        pass

    if key == 'DataObject':
        key = element.attrib['ObjectType']
    if key == 'Attribute':
        key = element.attrib['Name']
    return {key: value}