"""Reads and stitches images from a 2D grid into a mosaic"""
from collections import defaultdict
from itertools import chain
import glob
import imghdr
import json
import logging
import os
import re
from joblib import Parallel, delayed
import numpy as np
from .tile import Tile, OpenCVTile
logger = logging.getLogger(__name__)
[docs]
class Mosaic:
"""Stitches a mosaic from a list of tiles
Attributes
----------
grid : list
tiles arranged into a grid
shape : tuple
shape of the mosaic as (height, width[, channels])
size : tuple
number of tiles in the mosaic
tile_class : class
class to use for tiles in the mosaic
"""
#: int : Number of cores to use when processing images
num_cores = 1
[docs]
def __init__(self, path_or_tiles, tile_class=None):
"""Initializes a mosaic from a list of tiles
Parameters
----------
path_or_tiles : str or list-like
either the path to a directory containing tiles, a list of Tiles,
or a list of strings or arrays that can be used to create a Tile
tile_class : class
class to use for tiles in the mosaic. Defaults to OpenCVTile.
"""
# Use class of passed Tiles for tile_class
self.tile_class = tile_class
if not isinstance(path_or_tiles, str) and not isinstance(path_or_tiles[0], str):
# Get tile class from 1D list of Tile
if isinstance(path_or_tiles[0], Tile):
tile_class = path_or_tiles[0].__class__
# Get tile class from 2D grid of Tile
else:
tile_class = path_or_tiles[0][0].__class__
if self.tile_class and tile_class != self.tile_class:
logger.warning(
"`tile_class` does not match class of tiles in"
" `path_or_tiles`. Using class of tiles instead."
)
if not issubclass(tile_class, Tile):
raise ValueError(
f"tile_class must be Tile or subclass if inferred (got {tile_class}"
)
self.tile_class = tile_class
if not self.tile_class:
self.tile_class = OpenCVTile
self.shape = None
self.size = None
self.grid = None
self._pool = None
# Construct the grid underlying the mosaic
self._build_grid(path_or_tiles)
logger.info(f"Initialized {self}")
def __str__(self):
return f"<{self.__class__.__name__} shape={self.shape}>"
def __eq__(self, other):
return (
self.__class__ == other.__class__
and self.tile_class.__class__ == other.tile_class.__class__
and self.shape == other.shape
and self.size == other.size
and self.tiles == other.tiles
and self.grid == other.grid
)
@property
def tiles(self):
"""Gets a flattened list of all tiles in grid order"""
return list(chain.from_iterable(self.grid))
@property
def placed(self):
"""Calculates number of tiles that have been placed in the mosaic"""
return len([t for t in self.tiles if t.placed])
@property
def params(self):
"""Summarizes parameters needed to stitch mosaic"""
self._normalize_coordinates()
# Store coordinates for all placed tiles at full scale
coords = {}
filenames = {}
for i, tile in enumerate(self.tiles):
if tile.placed:
coords[i] = [tile.y / tile.scale, tile.x / tile.scale]
if isinstance(tile.source, str):
filenames[i] = os.path.basename(tile.source)
params = {
"metadata": {
"shape": list(self.shape),
"size": self.size,
"tile_shape": list(self.tiles[0].copy().reset().shape[:2]),
},
"coords": coords,
}
if filenames:
params["filenames"] = filenames
return params
@property
def detector(self):
"""Gets a copy of the detector used to align tiles in the mosaic"""
return self.tiles[0].detector
@detector.setter
def detector(self, val):
for tile in self.tiles:
tile.detector = val
@property
def matcher(self):
"""Gets a copy of the matcher used to align tiles in the mosaic
Only defined if using OpenCV.
"""
return self.tiles[0].matcher
@matcher.setter
def matcher(self, val):
for tile in self.tiles:
tile.matcher = val
@property
def pool(self):
"""Returns a shared joblib pool, creating it if needed"""
if self._pool is None:
self._pool = Parallel(n_jobs=self.num_cores)
return self._pool
@pool.setter
def pool(self, val):
self._pool = val
[docs]
def placeholder(self, tile=None, fill_value=0):
"""Creates a placeholder tile to fill in gaps in the mosaic
Arguments
---------
tile : Tile
tile to base the placeholder on. If not given, uses the first
tile in the tiles property.
fill_value : float or int
fill value
Returns
-------
Tile
placeholder tile filled with provided value
"""
if tile is None:
tile = self.tiles[0]
imdata = np.full(tile.shape, fill_value, dtype=tile.dtype)
tile = self.tile_class(imdata)
tile.is_placeholder = True
return tile
[docs]
def bounds(self):
"""Calculates bounds of the mosaic comprising the placed tiles
Returns
-------
tuple
bounds of tile as (y1, x1, y2, x2)
"""
ys = []
xs = []
for tile in self.tiles:
if tile.placed:
ys.append(tile.y)
xs.append(tile.x)
tile = self.tiles[0]
return min(ys), min(xs), max(ys) + tile.height, max(xs) + tile.width
[docs]
def copy(self):
"""Creates a copy of the mosaic based on the grid
Using the grid instead of the list of tiles allows the grid-building
step to be skipped when the copy is initialized.
Returns
-------
Mosaic
copy of the mosaic
"""
return self.__class__([[t.copy() for t in row] for row in self.grid])
[docs]
def save_params(self, path="params.json"):
"""Saves coordinates for placed tiles
Parameters
----------
path : str
path to the JSON file
"""
with open(path, "w", encoding="utf-8") as f:
json.dump(self.params, f, indent=4)
logger.info(f"Saved params for {self} to {path}")
[docs]
def load_params(self, path_or_obj="params.json"):
"""Loads coordinates for placed tiles at full scale
Parameters
----------
path_or_obj : str or dict
path to the JSON file or param dict from another mosaic
Raises
------
FileNotFoundError
thrown if input is path and path not found
ValueError
thrown if JSON can't be decoded or does not match this mosaic
"""
if isinstance(path_or_obj, str):
try:
with open(path_or_obj, "r", encoding="utf-8") as f:
path_or_obj = json.load(f)
except (FileNotFoundError, ValueError) as err:
logger.error(str(err), exc_info=err)
raise
# Confirm that metadata matches
for key, val in self.params["metadata"].items():
if path_or_obj.get("metadata", {}).get(key) != val:
msg = (
f"JSON param '{key}' does not match this mosaic"
f" ({val} != {path_or_obj.get('metadata', {}).get(key)})"
)
logger.error(msg)
raise ValueError(msg)
# Mosaics are similar, so update tile coordinates from params
for tile in self.tiles:
tile.y = None
tile.x = None
for key, (y, x) in path_or_obj["coords"].items():
tile = self.tiles[int(key)]
tile.y = y * tile.scale
tile.x = x * tile.scale
logger.info(f"Loaded previously calculated params for {self}")
[docs]
def reset_tiles(self):
"""Reloads tiles at their full resolution"""
self._normalize_coordinates()
return self._batch_tile_method("reset")
[docs]
def resize(self, size_or_shape, *args, **kwargs):
"""Rescales all tiles in the mosaic using size or shape
Parameters
----------
size_or_shape : int or tuple of ints
size in megapixels or shape of resized image
*args :
any argument accepted by the resize function used by the tile class
**kwargs :
any keyword argument accepted by the resize function used by the
tile class
"""
return self._batch_tile_method("resize", size_or_shape, *args, **kwargs)
[docs]
def downsample(self, size_or_shape, *args, **kwargs):
"""Downsamples all tiles in the mosaic using the given size or shape
Parameters
----------
size_or_shape : int or tuple of ints
size in megapixels or shape of resized image
*args :
any argument accepted by the resize function used by the tile class
**kwargs :
any keyword argument accepted by the resize function used by the
tile class
"""
return self._batch_tile_method("downsample", size_or_shape, *args, **kwargs)
[docs]
def detect_and_extract(self):
"""Detects and extracts features in tiles"""
return self._batch_tile_method(
"detect_and_extract",
batch=[t for t in self.tiles if not t.features_detected],
)
[docs]
def align(self, origin=None, limit=None, **kwargs):
"""Builds a mosaic by checking each tile against all others
Parameters
----------
origin : Tile
the tile around which to build the mosaic. If not given, method
will select the tile with the largest number of features.
limit : int
the number of tiles that must be successfully placed before the
method finishes. If not given, the method will continue until
it runs out of adjacent tiles with matching features. Setting
a limit allows a decent mosaic to be created quickly.
kwargs :
any keyword argument accepted by the align_to method on the
Tiles comprising this mosaic
"""
self.detect_and_extract()
# Limit to tiles with detected features
candidates = [t for t in self.tiles if t.features_detected]
if not candidates:
raise RuntimeError("No features detected in tiles")
# Start building from feature-rich tiles
candidates.sort(key=lambda t: -len(t.keypoints))
tiles = [origin if origin is not None else candidates.pop(0)]
# Set first tile to 0, 0
if tiles[0].y is None:
tiles[0].y = 0
if tiles[0].x is None:
tiles[0].x = 0
while tiles and candidates:
# Stop aligning if limit is reached
if limit is not None and self.placed >= limit:
logger.info(
f"Stopped aligning (limit={limit}," f" placed={self.placed})"
)
break
if len(tiles) == 1:
tile = tiles[0]
else:
tile = self.tile_class(tiles[0].draw(tiles[1:]))
tile.detect_and_extract()
tile.y = min([t.y for t in tiles])
tile.x = min([t.x for t in tiles])
self._batch_tile_method("align_to", tile, batch=candidates, **kwargs)
# Remove newly placed tiles from candidates
tiles = [t for t in candidates if t.placed]
candidates = [t for t in candidates if not t.placed]
else:
if limit is not None:
logger.warning(f"Failed to place {limit} tiles")
if self.placed == 1 and len(self.tiles) > 1:
raise RuntimeError("Could not align tiles")
logger.info(f"Aligned {self.placed} tiles in {self}")
[docs]
def build_out(self, *args, **kwargs):
"""Warns user that build_out is not implemented in Mosaic class
Use StructuredMosaic instead to get this functionality.
"""
logger.warning("build_out() ignored on Mosaic")
[docs]
def smooth_seams(self, origin=None):
"""Smooths intensities at seams between tiles
Parameters
----------
origin : Tile
starting tile
"""
tiles = [self._get_origin() if origin is None else origin]
placed = [t for t in self.tiles if t.placed and t not in tiles]
smoothed = []
min_xtn = 0
while tiles:
# Batch align adjacent tiles
unique = {}
for tile in tiles:
# Corner-to-corner matches can give bad results, so
# exclude small-area matches. This could cause problems
# with non-gridded tilesets.
xing = [t for t in placed if t.intersects(tile)]
if not min_xtn and len(xing) > 4:
xtns = [t.intersection(tile)[0].size for t in xing]
min_xtn = min(xtns) * 1.5
xing = [t for t in xing if t.intersection(tile)[0].size > min_xtn]
for neighbor in xing:
if neighbor not in smoothed:
unique.setdefault(neighbor.id, (tile, neighbor))
batch = list(unique.values())
self._batch_tile_method(
"match_gamma_to", [t for t, _ in batch], batch=[n for _, n in batch]
)
tiles = [n for _, n in batch]
smoothed.extend(tiles)
logger.info(f"Smoothed seams in {self}")
[docs]
def stitch(self, channel_order=None):
"""Stitches mosaic using either placed tiles or row/col of tiles
Parameters
----------
channel_order : str
order of the three color channels in the stitched array, for
example, RGB. Uses the backend order if not given, which can
give unexpected results (for example, OpenCV uses BGR).
Returns
-------
numpy.ndarray
"""
placed = [t for t in self.tiles if t.placed]
# If mosaic has not been aligned, set x and y based on tile location
reset_xy = not placed
if reset_xy:
y = 0
for row in self.grid:
x = 0
for tile in row:
tile.x = x
tile.y = y
x += tile.width
y += tile.height
placed = self.tiles
self._normalize_coordinates()
arr = placed[0].draw(placed[1:])
# Reorder color channels to match the specified order
if channel_order and channel_order.upper() != placed[0].channel_order:
order = [placed[0].channel_order.index(c) for c in channel_order.upper()]
order.extend(range(len(order), len(arr.shape) + 1))
arr = arr[..., order].copy()
# Reset tile x and y to None if set above
if reset_xy:
for tile in self.tiles:
tile.y = None
tile.x = None
logger.info(f"Drew {self}")
return arr
[docs]
def save(self, path):
"""Saves mosaic to path
Parameters
----------
path : str
file path
"""
self.tile_class.backend_save(path, self.stitch())
[docs]
def show(self, *args, **kwargs):
"""Shows the mosaic"""
self.tile_class.backend_show(self.stitch(), *args, **kwargs)
def _batch_tile_method(self, method, *args, batch=None, **kwargs):
"""Runs a tile method across many tiles, in f where possible
Eligible methods must:
+ Be chainable
+ Modify the tile in place
+ Only affect the current tile
+ Use a consistent set of arguments
Parameters
----------
method : str
name of a chainable tile method
*args:
any argument that can be passed to the method. If passed as a
list, it should have the same number of items as batch.
batch : list
list of tiles. If None, uses self.tiles instead.
**kwargs:
any keyword argument that can be passed to the method. If passed
as a list, it should have the same number of items as batch.W
"""
if batch is None:
batch = self.tiles
if not batch:
return
logger.info(
f"Running tile.{method}() using {self.num_cores}"
f" cores on batch of {len(batch)} tiles"
)
def task(tile, *args, **kwargs):
return getattr(tile, method)(*args, **kwargs)
# Each call gets it own set of args/kwargs supplied using using zip,
# so needs tp make sure the args/kwargs the right size
targs = [[] for _ in range(len(batch))]
for arg in args:
if not isinstance(arg, (list, tuple)) or len(arg) != len(batch):
arg = [arg] * len(batch)
for i, arg in enumerate(arg):
targs[i].append(arg)
# Ensure that keyword arguments all have same length as batch
tkwargs = [{} for _ in range(len(batch))]
for key, val in kwargs.items():
if not isinstance(val, (list, tuple)) or len(val) != len(batch):
val = [val] * len(batch)
for kwdict, val in zip(tkwargs, val):
kwdict[key] = val
results = self.pool(
delayed(task)(t, *a, **k) for t, a, k in zip(batch, targs, tkwargs)
)
for tile, result in zip(batch, results):
tile.update(result)
logger.info(f"Finished running tile.{method}() ({len(batch)} tiles)")
def _build_grid(self, path_or_tiles):
"""Builds grid and populates related attributes"""
tiles = self._get_tiles(path_or_tiles)
# Create the grid, which is meaningless for unstructured tilesets
# but provides a convenient way to view them
if self.grid is None:
self.shape = self._estimate_shape(tiles)
self.size = self.shape[0] * self.shape[1]
self.grid = build_grid(
tiles, self.shape[1], fill_value=self.placeholder(tiles[0])
)
self._verify_tiles()
# Situate each tile in the grid
for y, row in enumerate(self.grid):
for x, tile in enumerate(row):
tile.row = y
tile.col = x
tile.grid = self.grid
def _get_origin(self):
"""Selects a starting tile to work outward from"""
# If tiles have been placed, choose one that includes the center of
# the mosaic
placed = [t for t in self.tiles if t.placed]
if placed:
y1, x1, y2, x2 = self.bounds()
y_cent = (y2 - y1) / 2
x_cent = (x2 - x1) / 2
for tile in placed:
y1, x1, y2, x2 = tile.bounds()
if y1 <= y_cent <= y2 and x1 <= x_cent <= x2:
return tile
# If no tiles have been placed, choose based on the number of features
tiles = [t for t in self.tiles if t.features_detected]
if not tiles:
raise ValueError(
"No tiles with detected features found. This can happen"
" after running detect_and_extract() if reset_tiles()"
" is used."
)
tiles.sort(key=lambda t: -t.size)
return tiles[0]
def _get_tiles(self, path_or_tiles):
"""Gets a list of tiles
Parameters
----------
path_or_tiles : str or list-like
either the path to a directory containing tiles, a list of Tiles,
or a list of strings or arrays that can be used to create a Tile
Returns
-------
list of Tile
list of tiles based on input
"""
if isinstance(path_or_tiles, str):
logger.info(
f"Creating {self.__class__.__name__}" f" from path ({path_or_tiles})"
)
ext = self._check_tiles(path_or_tiles)
items = glob.iglob(os.path.join(path_or_tiles, f"*{ext}"))
elif is_grid(path_or_tiles):
logger.info(f"Creating {self.__class__.__name__} from 2D grid")
items = chain.from_iterable(path_or_tiles)
self.grid = path_or_tiles.copy()
self.shape = (len(path_or_tiles), len(path_or_tiles[0]))
self.size = self.shape[0] * self.shape[1]
else:
logger.info(f"Creating {self.__class__.__name__} from 1D list")
items = path_or_tiles
# Create and sort list of tiles
tiles = []
for item in items:
if item is None:
tiles.append(self.placeholder(tiles[0]))
elif isinstance(item, Tile):
tiles.append(item.copy())
else:
tiles.append(self.tile_class(item))
# Natural sort tiles if a directory was provided. Tiles provided as
# lists are kept in their original order.
if isinstance(path_or_tiles, str):
tiles.sort()
return tiles
def _normalize_coordinates(self):
"""Normalizes coordinates so that origin of the mosaic is (0, 0)"""
if self.placed:
min_y, min_x = self.bounds()[:2]
for tile in self.tiles:
if tile.placed:
tile.y -= min_y
tile.x -= min_x
def _verify_tiles(self):
"""Verifies that tiles and grid are valid and contain same objects"""
# if None in self.tiles:
# raise ValueError("Ragged grids are not allowed")
tile_ids = {t.id for t in self.tiles}
for row in self.grid:
for tile in row:
if tile.id not in tile_ids:
raise ValueError("grid and tiles contain different objects")
@staticmethod
def _check_tiles(path):
"""Finds and evaluates image files in the given directory"""
exts = defaultdict(int)
for fp in glob.iglob(os.path.join(path, "*.*")):
if imghdr.what(fp):
exts[os.path.splitext(fp)[1]] += 1
if not exts:
raise ValueError(f"No images found in `{path}`")
ext = [k for k, _ in sorted(exts.items(), key=lambda kv: -kv[1])][0]
return ext
@staticmethod
def _estimate_shape(tiles):
"""Fits tiles into a shape similar to a square"""
size = len(tiles)
widths = []
for num in range(1, size + 1):
if not size % num:
widths.append(num)
width = widths[len(widths) // 2]
height = size / width
shape = [height, width]
if tiles[0].channels != 1:
shape.append(tiles[0].channels)
return tuple([int(n) for n in shape])
[docs]
class StructuredMosaic(Mosaic):
"""Stitches a mosaic from a list of tiles with a known structure"""
[docs]
def __init__(
self,
path_or_tiles,
tile_class=None,
dim=None,
origin="upper left",
direction="horizontal",
pattern="raster",
):
"""Initializes a structured mosaic from a list of tiles
Parameters
----------
path_or_tiles : str or list-like
either the path to a directory containing tiles, a list of Tiles,
or a list of strings or arrays that can be used to create a Tile
tile_class : class
class to use for tiles in the mosaic. Defaults to OpenCVTile.
dim : tuple or int
either the shape of the mosaic as (height, width) or the number
of tiles in the direction traversed first, that is, the number of
columns (if horizontal) or number of rows (if vertical)
origin : str
the position of the first tile in the mosaic. One of "upper left",
"upper right", "lower left", or "lower right".
direction : str
direction to traverse first when building the mosaic. Either
"horizontal" or "vertical".
pattern : str
whether the grid is rastered or snaked. Either "raster" or "snake".
"""
super().__init__(path_or_tiles, tile_class=tile_class)
tiles = self._get_tiles(path_or_tiles)
# Create the grid if source is not a grid already
if self.grid is None:
self.shape, direction = self._refine_grid_params(tiles, dim, direction)
self.size = self.shape[0] * self.shape[1]
self.grid = build_grid(
tiles,
self.shape[1] if direction == "horizontal" else self.shape[0],
origin=origin,
direction=direction,
pattern=pattern,
fill_value=self.placeholder(tiles[0]),
)
self._verify_tiles()
# Situate each tile in the mosaic
for y, row in enumerate(self.grid):
for x, tile in enumerate(row):
tile.row = y
tile.col = x
tile.grid = self.grid
[docs]
def align(self, origin=None, limit=None, **kwargs):
"""Builds a mosaic outward from a single tile using feature matching
Parameters
----------
origin : Tile
the tile around which to build the mosaic. If not given, method
will select a tile near the center of the mosaic.
limit : int
the number of tiles that must be successfully placed before the
method finishes. If not given, the method will continue until
it runs out of adjacent tiles with matching features. Setting
a limit allows a decent mosaic to be created quickly.
kwargs :
any keyword argument accepted by the align_to method on the
Tiles comprising this mosaic
"""
# Align tiles outward from a single tile
tiles = [self._get_origin() if origin is None else origin]
# Set origin to 0, 0
if tiles[0].y is None:
tiles[0].y = 0
if tiles[0].x is None:
tiles[0].x = 0
while tiles:
# Stop aligning if limit is reached
if limit is not None and self.placed >= limit:
logger.info(f"Stopped aligning (limit={limit}, placed={self.placed})")
break
# Batch detect and extract features from tiles
unique = {}
for tile in tiles:
unique.setdefault(tile.id, tile)
for neighbor in tile.neighbors().values():
unique.setdefault(neighbor.id, neighbor)
batch = [t for t in unique.values() if not t.features_detected]
self._batch_tile_method("detect_and_extract", batch=batch)
# Batch align adjacent tiles
unique = {}
for tile in tiles:
for neighbor in tile.neighbors().values():
unique.setdefault(neighbor.id, (tile, neighbor))
batch = [(t, n) for t, n in unique.values() if not n.placed]
self._batch_tile_method(
"align_to", [t for t, _ in batch], batch=[n for _, n in batch], **kwargs
)
# Otherwise re-run the loop with the next group of tiles
tiles = [n for _, n in batch if n.placed]
else:
if limit is not None:
logger.warning(f"Failed to place {limit} tiles")
if self.placed == 1 and len(self.tiles) > 1:
raise RuntimeError("Could not align tiles")
logger.info(f"Aligned {self.placed} tiles in {self}")
[docs]
def build_out(self, from_placed=True, offsets=None):
"""Builds out from already placed tiles using the given offset
Used to complete mosaics that include tiles that were not placed
when the mosaic was built, either because the user assigned a limit
or because the feature matching algorithm failed to find a home for
them.
Parameters
----------
from_placed : bool
if True, unplaced tiles will be tacked onto already placed tiles
using the given offsets. If False, a new mosaic will be calculated
from scratch using the given offsets.
offsets : tuple
offsets between adjacent tiles as dy_row, dx_row, dy_col, dx_col.
If not given, the method will estimate the offsets if any tiles
have been placed or will ignore offsets if not.
"""
if offsets is None:
if [t for t in self.tiles if t.placed]:
offsets = self._estimate_offset()
else:
offsets = (self.tiles[0].height, 0, self.tiles[0].width, 0)
dy_row, dx_row, dy_col, dx_col = offsets
# Builds out from already placed tiles using the offsets
if from_placed:
tiles = [t for t in self.tiles if t.placed]
while tiles:
new = []
for tile in tiles:
for direction, neighbor in tile.neighbors().items():
if not neighbor.placed:
if direction == "top":
neighbor.y = tile.y - dy_row
neighbor.x = tile.x - dx_row
elif direction == "right":
neighbor.y = tile.y + dy_col
neighbor.x = tile.x + dx_col
elif direction == "bottom":
neighbor.y = tile.y + dy_row
neighbor.x = tile.x + dx_row
elif direction == "left":
neighbor.y = tile.y - dy_col
neighbor.x = tile.x - dx_col
new.append(neighbor)
tiles = new
logger.info(
f"Built {self} out from previously placed" f" tiles (offsets={offsets})"
)
# Builds mosaic from scratch based on the offsets
else:
for tile in self.tiles:
tile.y = tile.row * dy_row + tile.col * dy_col
tile.x = tile.row * dx_row + tile.col * dx_col
logger.info(f"Rebuilt {self} (offsets={offsets})")
def _build_grid(self, path_or_tiles):
"""Builds grid and populates related attributes"""
def _estimate_offset(self):
"""Estimates offset based on row and column
Returns
-------
tuple
average offsets between adjacent tiles as dy_row (change in y
between rows), dx_row (change in x between rows), dy_col (change
in y between columns), dx_col (change in x between columns)
"""
placed = [t for t in self.tiles if t.placed]
# Test size occupied by the placed tiles. If the tiles don't
# span at least two rows/columns, the offset calculation will
# give a bad result.
ys = []
xs = []
for tile in placed:
y1, x1, y2, x2 = tile.bounds(as_int=True)
ys.extend([y1, y2])
xs.extend([x1, x2])
height = max(ys) - min(ys)
width = max(xs) - min(xs)
tile = self.tiles[0]
if height < (tile.height * 1.5) or width < (tile.width * 1.5):
logger.warning(
"Placed tiles may be too close together to calculate an"
" accurate offset. If you specified a limit when running"
" align(), you may want to try a larger value."
)
# Get minimum row/col indexes so can normalize these to zero below
min_row = min([t.row for t in placed])
min_col = min([t.col for t in placed])
# Normalize coordinates to positive values starting at 0, 0
self._normalize_coordinates()
# Calculate average offsets based on row and column by looking at
# where tiles have been placed
a = []
by = []
bx = []
for tile in placed:
a.append([tile.row - min_row, tile.col - min_col])
by.append(tile.y)
bx.append(tile.x)
a = np.array(a)
by = np.array(by)
bx = np.array(bx)
dy_row, dy_col = np.linalg.lstsq(a, by, rcond=None)[0]
dx_row, dx_col = np.linalg.lstsq(a, bx, rcond=None)[0]
logger.info(
f"Estimated offsets in {self} as" f" {(dy_row, dx_row, dy_col, dx_col)}"
)
return dy_row, dx_row, dy_col, dx_col
def _get_origin(self):
"""Selects a starting tile to work outward from"""
return self.grid[self.shape[0] // 2][self.shape[1] // 2]
@staticmethod
def _refine_grid_params(tiles, dim, direction):
"""Refines shape and direction supplied during initialization"""
if dim is None or isinstance(dim, int):
if direction == "horizontal":
dim = (None, dim)
else:
dim = (dim, None)
height, width = dim
size = len(tiles)
if not width and not height:
# Check for file naming pattern used by the Mineral Sciences SEM
cols = []
for tile in tiles:
try:
cols.append(int(re.search(r"@(\d+)", tile.source).group(1)))
except (AttributeError, TypeError):
pass
if not cols:
raise ValueError("Could not infer shape of StructuredMosaic")
width = len(range(min(cols), max(cols) + 1))
direction = "vertical"
if width and not height:
while size % width:
size += 1
height = size // width
elif height and not width:
while size % height:
size += 1
width = size // height
shape = [height, width]
if tiles[0].channel_axis:
shape.append(tiles[0].channels)
return tuple([int(n) for n in shape]), direction
[docs]
def build_grid(
items,
dim,
origin="upper left",
direction="horizontal",
pattern="raster",
fill_value=None,
):
"""Builds a grid from a list
Parameters
----------
items : list
list to convert to a grid
dim : tuple or int
either the shape of the mosaic as (height, width) or the number
of tiles in the direction traversed first, that is, the number of
columns (if horizontal) or number of rows (if vertical)
origin : str
the position of the first tile in the mosaic. One of "upper left",
"upper right", "lower left", or "lower right".
direction : str
direction to traverse first when building the mosaic. Either
"horizontal" or "vertical".
pattern : str
whether the grid is a raster or snake
fill_value :
value used to fill missing items in a ragged grid
Returns
-------
list
List of rows in the grid
"""
# Validate keywords
origins = {
"ul": "upper left",
"ur": "upper right",
"lr": "lower right",
"ll": "lower left",
}
origin = origins.get(origin, origin)
if origin not in origins.values():
raise ValueError(f"origin must be a key or value in {origins}")
directions = ("horizontal", "vertical")
if direction not in directions:
raise ValueError(f"direction must be one of {directions}")
patterns = ("raster", "snake")
if pattern not in patterns:
raise ValueError(f"pattern must be one of {patterns}")
# Work from a copy of the list so that original remains intact
items = items.copy()
# Get expected dimensions of the mosaic
try:
dim_primary, dim_secondary = dim
except TypeError:
dim_primary = dim
dim_secondary = None
# Pad list so that all rows/columns have a full allotment of tiles
while len(items) % dim_primary:
items.append(None)
# Replace Nones with the given fill value
if None in items:
logger.warning("Grid is ragged")
if fill_value is not None:
for i, item in enumerate(items):
if item is None:
try:
items[i] = fill_value.copy()
except AttributeError:
items[i] = fill_value
# Split items into chunks of the given size. Use indexes instead of
# directly using the array so that arrays of arrays can be gridded
# without getting a warning from numpy.
rows = []
for row in np.array_split(np.arange(len(items)), len(items) / dim_primary):
rows.append([items[i] for i in row])
# Test if mosaic is expected size
if dim_secondary and len(rows) != dim_secondary:
raise ValueError("Grid is wrong shape")
# Reverse every other row if snake pattern
if pattern == "snake":
for i, row in enumerate(rows):
if i and i % 2:
row.reverse()
# Extract rows from columns if using vertical instead of horizontal
if direction == "vertical":
cols = rows
rows = []
for i in range(len(cols[0])):
row = []
for col in cols:
row.append(col[i])
rows.append(row)
# Reverse row order if origin is lower left or right corner
if "lower" in origin:
rows.reverse()
# Reverse order of individual rows if origin is on the right
if "right" in origin:
for row in rows:
row.reverse()
return rows
[docs]
def create_mosaic(
path_or_tiles, tile_class=None, dim=None, origin=None, direction=None, pattern=None
):
"""Creates a mosaic
See StructuredMosaic for available parameters.
Returns
-------
Mosaic or StructuredMosaic
tiles as either a structured or unstructured mosaic
"""
kwargs = {"dim": dim, "origin": origin, "direction": direction, "pattern": pattern}
kwargs = {k: v for k, v in kwargs.items() if v is not None}
try:
return StructuredMosaic(path_or_tiles, tile_class=tile_class, **kwargs)
except ValueError:
if kwargs:
raise
return Mosaic(path_or_tiles, tile_class=tile_class)
[docs]
def is_grid(items):
"""Tests if an iterable looks like a grid
Parameters
----------
items : list-like
list of items
Returns
-------
bool
True if tiles look like a grid, False if not
"""
# Both items and all direct children of items must be iterable
try:
iter(items)
iter(items[0])
except (IndexError, TypeError):
return False
# Strings are iterable, so rule them out too
if isinstance(items[0], str):
return False
# Check if all rows have the same length
if len({len(r) for r in items}) > 1:
return False
# Test if individual items are or can be made into tiles
val = items[0][0]
if isinstance(val, (str, Tile)) or (
isinstance(val, np.ndarray) and 2 <= len(val.shape) <= 3
):
return True
return False