From 9b71edd860f51ac0306757cacb50f1f520200635 Mon Sep 17 00:00:00 2001 From: Vladimir Iglovikov Date: Sun, 9 Feb 2025 11:49:29 -0800 Subject: [PATCH 1/7] Added tests for SimpleNMF --- albumentations/augmentations/functional.py | 275 +++++++++++++++++++++ albumentations/augmentations/transforms.py | 194 +++++++++++++++ tests/functional/test_functional.py | 90 +++++++ 3 files changed, 559 insertions(+) diff --git a/albumentations/augmentations/functional.py b/albumentations/augmentations/functional.py index a1d941563..3c688d584 100644 --- a/albumentations/augmentations/functional.py +++ b/albumentations/augmentations/functional.py @@ -3136,3 +3136,278 @@ def get_mud_params( "mud": mud.astype(np.float32), "non_mud": non_mud.astype(np.float32), } + + +# Standard reference H&E stain matrices +STAIN_MATRICES = { + "ruifrok": np.array( + [ # Ruifrok & Johnston standard reference + [0.644211, 0.716556, 0.266844], # Hematoxylin + [0.092789, 0.954111, 0.283111], # Eosin + ], + ), + "macenko": np.array( + [ # Macenko's reference + [0.5626, 0.7201, 0.4062], + [0.2159, 0.8012, 0.5581], + ], + ), + "standard": np.array( + [ # Standard bright-field microscopy + [0.65, 0.70, 0.29], + [0.07, 0.99, 0.11], + ], + ), + "high_contrast": np.array( + [ # Enhanced contrast + [0.55, 0.88, 0.11], + [0.12, 0.86, 0.49], + ], + ), + "h_heavy": np.array( + [ # Hematoxylin dominant + [0.75, 0.61, 0.32], + [0.04, 0.93, 0.36], + ], + ), + "e_heavy": np.array( + [ # Eosin dominant + [0.60, 0.75, 0.28], + [0.17, 0.95, 0.25], + ], + ), + "dark": np.array( + [ # Darker staining + [0.78, 0.55, 0.28], + [0.09, 0.97, 0.21], + ], + ), + "light": np.array( + [ # Lighter staining + [0.57, 0.71, 0.38], + [0.15, 0.89, 0.42], + ], + ), +} + + +def get_normalizer(method: Literal["vahadane", "macenko"], random_state: np.random.RandomState) -> StainNormalizer: + """Get stain normalizer based on method.""" + return VahadaneNormalizer(random_state) if method == "vahadane" else MacenkoNormalizer(random_state) + + +class StainNormalizer: + """Base class for stain normalizers.""" + + def __init__(self, random_state: np.random.RandomState) -> None: + self.stain_matrix_target = None + self.random_state = random_state + + def fit(self, img: np.ndarray) -> None: + """Extract stain matrix from image.""" + raise NotImplementedError + + +class SimpleNMF: + """Simple Non-negative Matrix Factorization for H&E stain separation. + + This is a simplified implementation specifically for H&E staining, + where we know we want exactly 2 components (H&E) and have RGB data. + """ + + def __init__(self, random_state: np.random.RandomState, n_iter: int = 100): + self.n_iter = n_iter + self.random_state = random_state + + def fit_transform(self, optical_density: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """Decompose optical density matrix into stain concentrations and colors. + + Args: + optical_density: Input matrix of shape (n_pixels, n_channels) + Optical density values of shape (n_pixels, 3) for RGB + + Returns: + stain_concentrations: Matrix of shape (n_pixels, 2) for H&E concentrations + stain_colors: Matrix of shape (2, 3) for RGB colors of each stain + """ + n_pixels, n_channels = optical_density.shape + n_stains = 2 # H&E stains + + # Initialize random matrices + stain_concentrations = self.random_state.uniform(0, 1, (n_pixels, n_stains)) + stain_colors = self.random_state.uniform(0, 1, (n_stains, n_channels)) + + # Normalize stain colors to unit norm + stain_colors = stain_colors / np.sqrt(np.sum(stain_colors**2, axis=1, keepdims=True)) + + # Iterative update rules + eps = 1e-7 # Small number to prevent division by zero + for _ in range(self.n_iter): + # Update concentrations + conc_numerator = np.dot(optical_density, stain_colors.T) + conc_denominator = np.dot(stain_concentrations, np.dot(stain_colors, stain_colors.T)) + eps + stain_concentrations *= conc_numerator / conc_denominator + + # Update colors + color_numerator = np.dot(stain_concentrations.T, optical_density) + color_denominator = np.dot(np.dot(stain_concentrations.T, stain_concentrations), stain_colors) + eps + stain_colors *= color_numerator / color_denominator + + # Normalize stain colors to unit norm + stain_colors = stain_colors / np.sqrt(np.sum(stain_colors**2, axis=1, keepdims=True)) + + return stain_concentrations, stain_colors + + +class VahadaneNormalizer(StainNormalizer): + """Vahadane stain normalizer using simplified NMF.""" + + def fit(self, img: np.ndarray) -> None: + """Extract H&E stain matrix using Vahadane's method. + + Args: + img: RGB image of shape (height, width, 3) + """ + # Convert to optical density + optical_density = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) + + # Apply NMF to get stain concentrations and colors + nmf = SimpleNMF(self.random_state, n_iter=100) + _, stain_colors = nmf.fit_transform(optical_density) + + # Order H&E components by their angles in the color plane + color_angles = np.arctan2(stain_colors[:, 1], stain_colors[:, 0]) + hematoxylin_idx = np.argmax(color_angles) + eosin_idx = 1 - hematoxylin_idx + + self.stain_matrix_target = np.array( + [ + stain_colors[hematoxylin_idx], # Hematoxylin stain color + stain_colors[eosin_idx], # Eosin stain color + ], + ) + + +class MacenkoNormalizer(StainNormalizer): + """Macenko stain normalizer.""" + + def fit(self, img: np.ndarray, angular_percentile: float = 99) -> None: + """Extract H&E stain matrix using Macenko's method. + + Args: + img: RGB image of shape (height, width, 3) + angular_percentile: Percentile for angle selection. Higher values + give more robust stain vectors. Range: 0-100 + """ + # Convert to optical density + optical_density = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) + + # Remove data points with very low optical density + density_threshold = 0.15 + valid_density_mask = np.all(optical_density > density_threshold, axis=1) + filtered_density = optical_density[valid_density_mask] + + # Compute eigenvectors of optical density covariance + _, eigenvectors = np.linalg.eigh(np.cov(filtered_density.T)) + stain_eigenvectors = eigenvectors[:, [2, 1]] # Select top 2 eigenvectors + + # Project data onto plane spanned by eigenvectors + density_projection = np.dot(filtered_density, stain_eigenvectors) + + # Find angular coordinates of points in the plane + stain_angles = np.arctan2(density_projection[:, 1], density_projection[:, 0]) + + # Find robust extremes (arctan2 returns values between -pi and pi) + hematoxylin_angle = np.percentile(stain_angles, 100 - angular_percentile) + eosin_angle = np.percentile(stain_angles, angular_percentile) + + # Convert angles back to stain vectors + hematoxylin_vector = np.dot( + stain_eigenvectors, + np.array([np.cos(hematoxylin_angle), np.sin(hematoxylin_angle)]), + ) + eosin_vector = np.dot( + stain_eigenvectors, + np.array([np.cos(eosin_angle), np.sin(eosin_angle)]), + ) + + # Order H&E components by their angles + if hematoxylin_vector[0] > eosin_vector[0]: + self.stain_matrix_target = np.array([hematoxylin_vector, eosin_vector]) + else: + self.stain_matrix_target = np.array([eosin_vector, hematoxylin_vector]) + + +def get_stain_concentrations( + img: np.ndarray, + stain_matrix: np.ndarray, +) -> np.ndarray: + """Extract stain concentrations from the image using the given stain matrix.""" + # Convert to optical density space + od = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) + + # Solve for concentrations using pseudo-inverse + concentrations = np.linalg.lstsq(stain_matrix, od.T, rcond=None)[0].T + return np.maximum(concentrations, 0) + + +def get_tissue_mask(img: np.ndarray, threshold: float = 0.85) -> np.ndarray: + """Get binary mask of tissue regions based on luminosity. + + Args: + img: RGB image + threshold: Luminosity threshold. Pixels with luminosity below this value + are considered tissue. Range: 0 to 1. Default: 0.85 + + Returns: + Binary mask where True indicates tissue regions + """ + # Convert to grayscale using OpenCV's weighted formula + luminosity = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) + + # Apply threshold (cv2.THRESH_BINARY_INV because tissue is darker) + _, mask = cv2.threshold( + src=luminosity, + thresh=int(threshold * 255), + maxval=1, + type=cv2.THRESH_BINARY_INV, + ) + + return mask.reshape(-1).astype(bool) + + +def apply_he_stain_augmentation( + img: np.ndarray, + stain_matrix: np.ndarray, + scale_factors: np.ndarray, + shift_values: np.ndarray, + augment_background: bool, +) -> np.ndarray: + """Apply H&E stain augmentation to the image. + + Args: + img: RGB image to augment + stain_matrix: 2x3 matrix containing H&E stain vectors + scale_factors: Multiplicative factors for H&E concentrations + shift_values: Additive shifts for H&E concentrations + augment_background: Whether to apply augmentation to background regions + + Returns: + Augmented RGB image + """ + # Get concentrations and tissue mask + concentrations = get_stain_concentrations(img, stain_matrix) + tissue_mask = get_tissue_mask(img) if not augment_background else None + + # Apply augmentation + if tissue_mask is not None: + concentrations[tissue_mask] = concentrations[tissue_mask] * scale_factors + shift_values + else: + concentrations = concentrations * scale_factors + shift_values + + # Reconstruct image + od = np.dot(concentrations, stain_matrix) + img_augmented = 255 * np.exp(-od) + img_augmented = img_augmented.reshape(img.shape) + + return np.clip(img_augmented, 0, 255).astype(np.uint8) diff --git a/albumentations/augmentations/transforms.py b/albumentations/augmentations/transforms.py index c4b54a565..a2151a663 100644 --- a/albumentations/augmentations/transforms.py +++ b/albumentations/augmentations/transforms.py @@ -6483,3 +6483,197 @@ def apply_to_volumes(self, volumes: np.ndarray, **params: Any) -> np.ndarray: def get_transform_init_args_names(self) -> tuple[str, ...]: return "cutoff", "ignore", "method" + + +class HEStain(ImageOnlyTransform): + """Applies H&E (Hematoxylin and Eosin) stain augmentation to histopathology images. + + This transform simulates different H&E staining conditions using either: + 1. Predefined stain matrices (8 standard references) + 2. Vahadane method for stain extraction + 3. Macenko method for stain extraction + 4. Custom stain matrices + + Args: + method: Method to use for stain augmentation: + - "preset": Use predefined stain matrices + - "vahadane": Extract using Vahadane method + - "macenko": Extract using Macenko method + - "custom": Use custom provided matrix + Default: "preset" + + preset: Preset stain matrix to use when method="preset": + - "ruifrok": Standard reference from Ruifrok & Johnston + - "macenko": Reference from Macenko's method + - "standard": Typical bright-field microscopy + - "high_contrast": Enhanced contrast + - "h_heavy": Hematoxylin dominant + - "e_heavy": Eosin dominant + - "dark": Darker staining + - "light": Lighter staining + Default: "standard" + + custom_matrix: Custom stain matrix to use when method="custom". + Shape should be (2, 3) for H&E stains. + + intensity_scale_range: Range for multiplicative stain intensity variation. + Values are multipliers between 0.5 and 1.5. For example: + - (0.7, 1.3) means stain intensities will vary from 70% to 130% + - (0.9, 1.1) gives subtle variations + - (0.5, 1.5) gives dramatic variations + Default: (0.7, 1.3) + + intensity_shift_range: Range for additive stain intensity variation. + Values between -0.3 and 0.3. For example: + - (-0.2, 0.2) means intensities will be shifted by -20% to +20% + - (-0.1, 0.1) gives subtle shifts + - (-0.3, 0.3) gives dramatic shifts + Default: (-0.2, 0.2) + + augment_background: Whether to apply augmentation to background regions. + Default: False + + Targets: + image + + Image types: + uint8, float32 + + References: + .. [1] A. C. Ruifrok and D. A. Johnston, "Quantification of histochemical + staining by color deconvolution," Analytical and quantitative + cytology and histology, 2001. + .. [2] M. Macenko et al., "A method for normalizing histology slides for + quantitative analysis," 2009 IEEE International Symposium on + Biomedical Imaging, 2009. + """ + + class InitSchema(BaseTransformInitSchema): + method: Literal["preset", "vahadane", "macenko", "custom"] + preset: ( + Literal[ + "ruifrok", + "macenko", + "standard", + "high_contrast", + "h_heavy", + "e_heavy", + "dark", + "light", + ] + | None + ) + custom_matrix: np.ndarray | None + intensity_scale_range: Annotated[ + tuple[float, float], + AfterValidator(nondecreasing), + AfterValidator(check_range_bounds(0, None)), + ] + intensity_shift_range: Annotated[ + tuple[float, float], + AfterValidator(nondecreasing), + AfterValidator(check_range_bounds(-1, 1)), + ] + augment_background: bool + + @model_validator(mode="after") + def validate_matrix_selection(self) -> Self: + if self.method == "preset" and self.preset is None: + self.preset = "standard" + elif self.method == "custom" and self.custom_matrix is None: + raise ValueError("custom_matrix must be provided when method='custom'") + elif self.method == "custom" and self.custom_matrix is not None and self.custom_matrix.shape != (2, 3): + raise ValueError("custom_matrix must have shape (2, 3)") + return self + + def __init__( + self, + method: Literal["preset", "vahadane", "macenko", "custom"] = "preset", + preset: Literal[ + "ruifrok", + "macenko", + "standard", + "high_contrast", + "h_heavy", + "e_heavy", + "dark", + "light", + ] + | None = None, + custom_matrix: np.ndarray | None = None, + intensity_scale_range: tuple[float, float] = (0.7, 1.3), + intensity_shift_range: tuple[float, float] = (-0.2, 0.2), + augment_background: bool = False, + p: float = 0.5, + ): + super().__init__(p=p) + self.method = method + self.preset = preset if preset is not None else "standard" + self.custom_matrix = custom_matrix + self.intensity_scale_range = intensity_scale_range + self.intensity_shift_range = intensity_shift_range + self.augment_background = augment_background + + if method in ["vahadane", "macenko"]: + self.stain_extractor = fmain.get_normalizer(cast(Literal["vahadane", "macenko"], method), self.py_random) + + def get_stain_matrix(self, img: np.ndarray) -> np.ndarray: + """Get stain matrix based on selected method.""" + if self.method == "preset": + return fmain.STAIN_MATRICES[self.preset] + if self.method == "custom": + return self.custom_matrix + # vahadane or macenko + self.stain_extractor.fit(img) + return self.stain_extractor.stain_matrix_target + + def apply( + self, + img: np.ndarray, + stain_matrix: np.ndarray, + scale_factors: np.ndarray, + shift_values: np.ndarray, + **params: Any, + ) -> np.ndarray: + return fmain.apply_he_stain_augmentation( + img=img, + stain_matrix=stain_matrix, + scale_factors=scale_factors, + shift_values=shift_values, + augment_background=self.augment_background, + ) + + def get_params_dependent_on_data(self, params: dict[str, Any], data: dict[str, Any]) -> dict[str, Any]: + # Get stain matrix + image = data["image"] + stain_matrix = self.get_stain_matrix(image) + + # Generate random scaling and shift parameters for both H&E channels + scale_factors = np.array( + [ + self.py_random.uniform(*self.intensity_scale_range), + self.py_random.uniform(*self.intensity_scale_range), + ], + ) + shift_values = np.array( + [ + self.py_random.uniform(*self.intensity_shift_range), + self.py_random.uniform(*self.intensity_shift_range), + ], + ) + + return { + "stain_matrix": stain_matrix, + "scale_factors": scale_factors, + "shift_values": shift_values, + } + + def get_transform_init_args_names(self) -> tuple[str, ...]: + return ( + "method", + "preset", + "custom_matrix", + "intensity_scale_range", + "intensity_shift_range", + "augment_background", + ) diff --git a/tests/functional/test_functional.py b/tests/functional/test_functional.py index e9a2b35c4..da174c802 100644 --- a/tests/functional/test_functional.py +++ b/tests/functional/test_functional.py @@ -22,6 +22,7 @@ ) from tests.utils import convert_2d_to_target_format from copy import deepcopy +from sklearn.decomposition import NMF @pytest.mark.parametrize( @@ -2721,3 +2722,92 @@ def test_deterministic(): np.testing.assert_array_almost_equal(result1["mud"], result2["mud"]) np.testing.assert_array_almost_equal(result1["non_mud"], result2["non_mud"]) + + + +@pytest.fixture +def random_state(): + return np.random.RandomState(42) + +@pytest.mark.parametrize( + ["height", "width", "n_iter"], + [ + (100, 100, 50), # Small square image + (200, 100, 100), # Rectangle image + (50, 50, 200), # Small image, more iterations + ] +) +def test_simple_nmf_shape(height, width, n_iter, random_state): + # Create synthetic H&E-like data + img = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) + od = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) + + # Our implementation + nmf = fmain.SimpleNMF(random_state=random_state, n_iter=n_iter) + concentrations, colors = nmf.fit_transform(od) + + # Check shapes + assert concentrations.shape == (height * width, 2) + assert colors.shape == (2, 3) + + # Check non-negativity + assert np.all(concentrations >= 0) + assert np.all(colors >= 0) + + # Check unit norm constraint on colors + np.testing.assert_allclose( + np.sum(colors**2, axis=1), + np.ones(2), + rtol=1e-5 + ) + +@pytest.mark.parametrize( + ["height", "width"], + [ + (100, 100), # Square image + (200, 100), # Rectangle image + ] +) +def test_simple_nmf_against_sklearn(height, width, random_state): + # Create synthetic H&E-like data + img = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) + od = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) + + # Our implementation + our_nmf = fmain.SimpleNMF(random_state=random_state, n_iter=100) + our_concentrations, our_colors = our_nmf.fit_transform(od) + + # Sklearn implementation + sklearn_nmf = NMF( + n_components=2, + init='random', + random_state=42, + max_iter=100 + ) + sklearn_concentrations = sklearn_nmf.fit_transform(od) + sklearn_colors = sklearn_nmf.components_ + + # Reconstruction error comparison + our_reconstruction = np.dot(our_concentrations, our_colors) + sklearn_reconstruction = np.dot(sklearn_concentrations, sklearn_colors) + + our_error = np.mean((od - our_reconstruction) ** 2) + sklearn_error = np.mean((od - sklearn_reconstruction) ** 2) + + # Our error should be within a reasonable factor of sklearn's + assert our_error <= 2 * sklearn_error + + +def test_simple_nmf_reproducibility(random_state): + # Test that same random state produces same results + img = np.random.randint(0, 256, (100, 100, 3), dtype=np.uint8) + od = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) + + nmf1 = fmain.SimpleNMF(random_state=np.random.RandomState(42)) + nmf2 = fmain.SimpleNMF(random_state=np.random.RandomState(42)) + + result1 = nmf1.fit_transform(od) + result2 = nmf2.fit_transform(od) + + np.testing.assert_allclose(result1[0], result2[0]) + np.testing.assert_allclose(result1[1], result2[1]) From 5b1fe2a3b7ba909c6962fc6cd6a8416814b71bae Mon Sep 17 00:00:00 2001 From: Vladimir Iglovikov Date: Sun, 9 Feb 2025 13:55:40 -0800 Subject: [PATCH 2/7] Pass tests in functional --- albumentations/augmentations/functional.py | 158 ++++++++------- albumentations/augmentations/transforms.py | 34 +++- tests/aug_definitions.py | 1 + tests/functional/test_functional.py | 217 +++++++++++++++++++++ 4 files changed, 330 insertions(+), 80 deletions(-) diff --git a/albumentations/augmentations/functional.py b/albumentations/augmentations/functional.py index 3c688d584..4efb0296f 100644 --- a/albumentations/augmentations/functional.py +++ b/albumentations/augmentations/functional.py @@ -3289,66 +3289,79 @@ def fit(self, img: np.ndarray) -> None: class MacenkoNormalizer(StainNormalizer): - """Macenko stain normalizer.""" + """Macenko stain normalizer with optimized computations.""" - def fit(self, img: np.ndarray, angular_percentile: float = 99) -> None: - """Extract H&E stain matrix using Macenko's method. - - Args: - img: RGB image of shape (height, width, 3) - angular_percentile: Percentile for angle selection. Higher values - give more robust stain vectors. Range: 0-100 - """ - # Convert to optical density - optical_density = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) + def __init__(self, random_state: np.random.RandomState, angular_percentile: float = 99): + super().__init__(random_state) + self.angular_percentile = angular_percentile - # Remove data points with very low optical density - density_threshold = 0.15 - valid_density_mask = np.all(optical_density > density_threshold, axis=1) - filtered_density = optical_density[valid_density_mask] - - # Compute eigenvectors of optical density covariance - _, eigenvectors = np.linalg.eigh(np.cov(filtered_density.T)) - stain_eigenvectors = eigenvectors[:, [2, 1]] # Select top 2 eigenvectors - - # Project data onto plane spanned by eigenvectors - density_projection = np.dot(filtered_density, stain_eigenvectors) + def fit(self, img: np.ndarray, angular_percentile: float = 99) -> None: + """Extract H&E stain matrix using optimized Macenko's method.""" + # Step 1: Convert RGB to optical density (OD) space + pixel_matrix = img.reshape(-1, 3).astype(np.float32) + pixel_matrix = np.maximum(pixel_matrix, 1) + optical_density = -np.log(pixel_matrix / 255.0) + + # Step 2: Remove background pixels + od_threshold = 0.05 + threshold_mask = (optical_density > od_threshold).any(axis=1) + tissue_density = optical_density[threshold_mask] + + if len(tissue_density) < 1: + raise ValueError(f"No tissue pixels found (threshold={od_threshold})") + + # Step 3: Compute covariance matrix + tissue_density = np.ascontiguousarray(tissue_density, dtype=np.float32) + od_covariance = cv2.calcCovarMatrix( + tissue_density, + None, + cv2.COVAR_NORMAL | cv2.COVAR_ROWS | cv2.COVAR_SCALE, + )[0] + + # Step 4: Get principal components + eigenvalues, eigenvectors = cv2.eigen(od_covariance)[1:] + idx = np.argsort(eigenvalues.ravel())[-2:] + principal_eigenvectors = np.ascontiguousarray(eigenvectors[:, idx], dtype=np.float32) + + # Step 5: Project onto eigenvector plane + plane_coordinates = tissue_density @ principal_eigenvectors + + # Step 6: Find angles of extreme points + polar_angles = np.arctan2( + plane_coordinates[:, 1], + plane_coordinates[:, 0], + ) - # Find angular coordinates of points in the plane - stain_angles = np.arctan2(density_projection[:, 1], density_projection[:, 0]) + # Get robust angle estimates + hematoxylin_angle = np.percentile(polar_angles, 100 - angular_percentile) + eosin_angle = np.percentile(polar_angles, angular_percentile) - # Find robust extremes (arctan2 returns values between -pi and pi) - hematoxylin_angle = np.percentile(stain_angles, 100 - angular_percentile) - eosin_angle = np.percentile(stain_angles, angular_percentile) + # Step 7: Convert angles back to RGB space + hem_cos, hem_sin = np.cos(hematoxylin_angle), np.sin(hematoxylin_angle) + eos_cos, eos_sin = np.cos(eosin_angle), np.sin(eosin_angle) - # Convert angles back to stain vectors - hematoxylin_vector = np.dot( - stain_eigenvectors, - np.array([np.cos(hematoxylin_angle), np.sin(hematoxylin_angle)]), + angle_to_vector = np.array( + [[hem_cos, hem_sin], [eos_cos, eos_sin]], + dtype=np.float32, ) - eosin_vector = np.dot( - stain_eigenvectors, - np.array([np.cos(eosin_angle), np.sin(eosin_angle)]), + stain_vectors = cv2.gemm( + angle_to_vector, + principal_eigenvectors.T, + 1, + None, + 0, ) - # Order H&E components by their angles - if hematoxylin_vector[0] > eosin_vector[0]: - self.stain_matrix_target = np.array([hematoxylin_vector, eosin_vector]) - else: - self.stain_matrix_target = np.array([eosin_vector, hematoxylin_vector]) + # Step 8: Ensure non-negativity by taking absolute values + # This is valid because stain vectors represent absorption coefficients + stain_vectors = np.abs(stain_vectors) + # Step 9: Normalize vectors to unit length + stain_vectors = stain_vectors / np.sqrt(np.sum(stain_vectors**2, axis=1, keepdims=True)) -def get_stain_concentrations( - img: np.ndarray, - stain_matrix: np.ndarray, -) -> np.ndarray: - """Extract stain concentrations from the image using the given stain matrix.""" - # Convert to optical density space - od = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) - - # Solve for concentrations using pseudo-inverse - concentrations = np.linalg.lstsq(stain_matrix, od.T, rcond=None)[0].T - return np.maximum(concentrations, 0) + # Step 10: Order vectors as [hematoxylin, eosin] + # Hematoxylin typically has larger red component + self.stain_matrix_target = stain_vectors if stain_vectors[0, 0] > stain_vectors[1, 0] else stain_vectors[::-1] def get_tissue_mask(img: np.ndarray, threshold: float = 0.85) -> np.ndarray: @@ -3376,6 +3389,8 @@ def get_tissue_mask(img: np.ndarray, threshold: float = 0.85) -> np.ndarray: return mask.reshape(-1).astype(bool) +@clipped +@uint8_io def apply_he_stain_augmentation( img: np.ndarray, stain_matrix: np.ndarray, @@ -3383,31 +3398,28 @@ def apply_he_stain_augmentation( shift_values: np.ndarray, augment_background: bool, ) -> np.ndarray: - """Apply H&E stain augmentation to the image. - - Args: - img: RGB image to augment - stain_matrix: 2x3 matrix containing H&E stain vectors - scale_factors: Multiplicative factors for H&E concentrations - shift_values: Additive shifts for H&E concentrations - augment_background: Whether to apply augmentation to background regions - - Returns: - Augmented RGB image - """ - # Get concentrations and tissue mask - concentrations = get_stain_concentrations(img, stain_matrix) - tissue_mask = get_tissue_mask(img) if not augment_background else None - - # Apply augmentation - if tissue_mask is not None: + # Step 1: Convert to optical density + pixel_matrix = img.reshape(-1, 3).astype(np.float32) + pixel_matrix = np.maximum(pixel_matrix, 1) + optical_density = -np.log(pixel_matrix / 255.0) + + # Step 2: Calculate concentrations + stain_matrix = np.ascontiguousarray(stain_matrix, dtype=np.float32) + concentrations = np.linalg.solve( + stain_matrix @ stain_matrix.T, + stain_matrix @ optical_density.T, + ).T + + # Step 3: Apply scaling and shifting + if not augment_background: + tissue_mask = get_tissue_mask(img).reshape(-1) concentrations[tissue_mask] = concentrations[tissue_mask] * scale_factors + shift_values else: + # Use numpy operations instead of cv2 concentrations = concentrations * scale_factors + shift_values - # Reconstruct image - od = np.dot(concentrations, stain_matrix) - img_augmented = 255 * np.exp(-od) - img_augmented = img_augmented.reshape(img.shape) + # Step 4: Reconstruct image + od_result = concentrations @ stain_matrix + rgb_result = 255.0 * np.exp(-od_result) - return np.clip(img_augmented, 0, 255).astype(np.uint8) + return rgb_result.reshape(img.shape) diff --git a/albumentations/augmentations/transforms.py b/albumentations/augmentations/transforms.py index a2151a663..de68cb78d 100644 --- a/albumentations/augmentations/transforms.py +++ b/albumentations/augmentations/transforms.py @@ -94,6 +94,7 @@ "FancyPCA", "FromFloat", "GaussNoise", + "HEStain", "HueSaturationValue", "ISONoise", "Illumination", @@ -6497,6 +6498,7 @@ class HEStain(ImageOnlyTransform): Args: method: Method to use for stain augmentation: - "preset": Use predefined stain matrices + - "random_preset": Randomly select a preset matrix each time - "vahadane": Extract using Vahadane method - "macenko": Extract using Macenko method - "custom": Use custom provided matrix @@ -6549,7 +6551,7 @@ class HEStain(ImageOnlyTransform): """ class InitSchema(BaseTransformInitSchema): - method: Literal["preset", "vahadane", "macenko", "custom"] + method: Literal["preset", "random_preset", "vahadane", "macenko", "custom"] preset: ( Literal[ "ruifrok", @@ -6580,6 +6582,8 @@ class InitSchema(BaseTransformInitSchema): def validate_matrix_selection(self) -> Self: if self.method == "preset" and self.preset is None: self.preset = "standard" + elif self.method == "random_preset" and self.preset is not None: + raise ValueError("preset should not be specified when method='random_preset'") elif self.method == "custom" and self.custom_matrix is None: raise ValueError("custom_matrix must be provided when method='custom'") elif self.method == "custom" and self.custom_matrix is not None and self.custom_matrix.shape != (2, 3): @@ -6588,7 +6592,7 @@ def validate_matrix_selection(self) -> Self: def __init__( self, - method: Literal["preset", "vahadane", "macenko", "custom"] = "preset", + method: Literal["preset", "random_preset", "vahadane", "macenko", "custom"] = "random_preset", preset: Literal[ "ruifrok", "macenko", @@ -6598,8 +6602,7 @@ def __init__( "e_heavy", "dark", "light", - ] - | None = None, + ] = "standard", custom_matrix: np.ndarray | None = None, intensity_scale_range: tuple[float, float] = (0.7, 1.3), intensity_shift_range: tuple[float, float] = (-0.2, 0.2), @@ -6608,19 +6611,36 @@ def __init__( ): super().__init__(p=p) self.method = method - self.preset = preset if preset is not None else "standard" + self.preset = preset self.custom_matrix = custom_matrix self.intensity_scale_range = intensity_scale_range self.intensity_shift_range = intensity_shift_range self.augment_background = augment_background if method in ["vahadane", "macenko"]: - self.stain_extractor = fmain.get_normalizer(cast(Literal["vahadane", "macenko"], method), self.py_random) + self.stain_extractor = fmain.get_normalizer( + cast(Literal["vahadane", "macenko"], method), + self.random_generator, + ) + + self.preset_names = [ + "ruifrok", + "macenko", + "standard", + "high_contrast", + "h_heavy", + "e_heavy", + "dark", + "light", + ] def get_stain_matrix(self, img: np.ndarray) -> np.ndarray: """Get stain matrix based on selected method.""" if self.method == "preset": return fmain.STAIN_MATRICES[self.preset] + if self.method == "random_preset": + random_preset = self.py_random.choice(self.preset_names) + return fmain.STAIN_MATRICES[random_preset] if self.method == "custom": return self.custom_matrix # vahadane or macenko @@ -6645,7 +6665,7 @@ def apply( def get_params_dependent_on_data(self, params: dict[str, Any], data: dict[str, Any]) -> dict[str, Any]: # Get stain matrix - image = data["image"] + image = data["image"] if "image" in data else data["images"][0] stain_matrix = self.get_stain_matrix(image) # Generate random scaling and shift parameters for both H&E channels diff --git a/tests/aug_definitions.py b/tests/aug_definitions.py index 7cacc357f..4a6f49a63 100644 --- a/tests/aug_definitions.py +++ b/tests/aug_definitions.py @@ -432,4 +432,5 @@ [A.AtLeastOneBBoxRandomCrop, {"height": 80, "width": 80, "erosion_factor": 0.2}], [A.ConstrainedCoarseDropout, {"num_holes_range": (1, 3), "hole_height_range": (0.1, 0.2), "hole_width_range": (0.1, 0.2), "fill": 0, "fill_mask": 0, "mask_indices": [1]}], [A.RandomSizedBBoxSafeCrop, {"height": 80, "width": 80, "erosion_rate": 0.2}], + [A.HEStain, {"method": "vahadane", "intensity_scale_range": (0.5, 1.5), "intensity_shift_range": (-0.1, 0.1), "augment_background": True}], ] diff --git a/tests/functional/test_functional.py b/tests/functional/test_functional.py index da174c802..8c0f5ef72 100644 --- a/tests/functional/test_functional.py +++ b/tests/functional/test_functional.py @@ -2811,3 +2811,220 @@ def test_simple_nmf_reproducibility(random_state): np.testing.assert_allclose(result1[0], result2[0]) np.testing.assert_allclose(result1[1], result2[1]) + + + +@pytest.fixture +def synthetic_he_image(): + """Create synthetic H&E-like image with known stain vectors.""" + # Define synthetic H&E stain vectors (simplified but realistic) + h_vector = np.array([0.65, 0.70, 0.29]) # Hematoxylin (purple) + e_vector = np.array([0.07, 0.99, 0.11]) # Eosin (pink) + stain_matrix = np.vstack([h_vector, e_vector]) + + # Create synthetic concentrations with distinct regions + height, width = 100, 100 + h_concentration = np.zeros((height, width)) + e_concentration = np.zeros((height, width)) + + # Hematoxylin-rich region (top half) + h_concentration[:50, :] = np.random.uniform(0.5, 1.0, (50, width)) + e_concentration[:50, :] = np.random.uniform(0, 0.5, (50, width)) + + # Eosin-rich region (bottom half) + h_concentration[50:, :] = np.random.uniform(0, 0.5, (50, width)) + e_concentration[50:, :] = np.random.uniform(0.5, 1.0, (50, width)) + + # Add background region (top-left corner) + h_concentration[:20, :20] = 0 + e_concentration[:20, :20] = 0 + + # Create image + concentrations = np.stack([h_concentration, e_concentration], axis=-1) + optical_density = np.dot(concentrations, stain_matrix) + img = np.exp(-optical_density) * 255 + return img.astype(np.uint8), stain_matrix, concentrations + + +@pytest.mark.parametrize( + ["normalizer_class", "kwargs"], + [ + (fmain.VahadaneNormalizer, {"random_state": np.random.RandomState(42)}), + (fmain.MacenkoNormalizer, {"random_state": np.random.RandomState(42), "angular_percentile": 99}), + ] +) +def test_normalizer_output_shape(normalizer_class, kwargs, synthetic_he_image): + """Test that normalizers produce correctly shaped output.""" + img = synthetic_he_image[0] + normalizer = normalizer_class(**kwargs) + normalizer.fit(img) + + assert normalizer.stain_matrix_target.shape == (2, 3) + assert np.all(normalizer.stain_matrix_target >= 0) + assert np.allclose( + np.sum(normalizer.stain_matrix_target**2, axis=1), + np.ones(2), + rtol=1e-5 + ) + +@pytest.mark.parametrize( + ["normalizer_class", "kwargs"], + [ + (fmain.VahadaneNormalizer, {"random_state": np.random.RandomState(137)}), + (fmain.MacenkoNormalizer, { + "random_state": np.random.RandomState(137), + "angular_percentile": 99 + }), + ] +) +def test_normalizer_consistency(normalizer_class, kwargs, synthetic_he_image): + """Test that normalizers produce consistent results.""" + img, stain_matrix, _ = synthetic_he_image # Unpack 3 values + + # Create two normalizers with SAME random state + rs1 = np.random.RandomState(137) # Explicit seed + rs2 = np.random.RandomState(137) # Same seed + + kwargs1 = {**kwargs, "random_state": rs1} + kwargs2 = {**kwargs, "random_state": rs2} + + normalizer1 = normalizer_class(**kwargs1) + normalizer2 = normalizer_class(**kwargs2) + + normalizer1.fit(img) + normalizer2.fit(img) + + np.testing.assert_allclose( + normalizer1.stain_matrix_target, + normalizer2.stain_matrix_target, + rtol=1e-5 + ) + +@pytest.mark.parametrize( + ["normalizer_class", "kwargs", "angle_tolerance"], + [ + (fmain.VahadaneNormalizer, {"random_state": np.random.RandomState(42)}, 45), + (fmain.MacenkoNormalizer, { + "random_state": np.random.RandomState(42), + "angular_percentile": 99 + }, 45), + ] +) +def test_normalizer_stain_separation(normalizer_class, kwargs, angle_tolerance, synthetic_he_image): + """Test that normalizers correctly separate H&E stains.""" + img, stain_matrix, _ = synthetic_he_image # Unpack 3 values + normalizer = normalizer_class(**kwargs) + normalizer.fit(img) + + # Calculate angles between true and estimated stain vectors + for i in range(2): + cos_angle = np.dot(normalizer.stain_matrix_target[i], stain_matrix[i]) + cos_angle /= np.linalg.norm(normalizer.stain_matrix_target[i]) + cos_angle /= np.linalg.norm(stain_matrix[i]) + angle = np.arccos(np.clip(cos_angle, -1.0, 1.0)) + angle_degrees = np.degrees(angle) + assert angle_degrees < angle_tolerance + + +@pytest.mark.parametrize( + "angular_percentile", + [0, 50, 90, 99, 99.9] +) +def test_macenko_angular_percentile(angular_percentile, synthetic_he_image): + """Test MacenkoNormalizer with different angular percentiles.""" + img = synthetic_he_image[0] # Unpack 3 values + normalizer = fmain.MacenkoNormalizer( + random_state=np.random.RandomState(42), + angular_percentile=angular_percentile + ) + normalizer.fit(img) + + assert normalizer.stain_matrix_target.shape == (2, 3) + assert np.all(normalizer.stain_matrix_target >= 0) + + +@pytest.mark.parametrize( + ["scale_factors", "shift_values", "expected_changes"], + [ + # Increase H only + ( + np.array([2.0, 1.0], dtype=np.float32), + np.array([0.0, 0.0], dtype=np.float32), + {"h_change": "increase", "e_change": "stable"} + ), + # Decrease both + ( + np.array([0.5, 0.5], dtype=np.float32), + np.array([0.0, 0.0], dtype=np.float32), + {"h_change": "decrease", "e_change": "decrease"} + ), + ] +) +def test_stain_augmentation_effects(synthetic_he_image, scale_factors, shift_values, expected_changes): + """Test that augmentation produces expected changes in stain intensities.""" + img, stain_matrix, original_concentrations = synthetic_he_image + + # Ensure stain_matrix is float32 + stain_matrix = stain_matrix.astype(np.float32) + + result = fmain.apply_he_stain_augmentation( + img=img, + stain_matrix=stain_matrix, + scale_factors=scale_factors, + shift_values=shift_values, + augment_background=True + ) + + # Calculate changes in optical density + def to_od(x): + return -np.log((x.astype(np.float32) + 1) / 256) + + od_orig = to_od(img) + od_result = to_od(result) + + # Exclude background pixels (where OD is very low) + tissue_mask = np.any(od_orig > 0.15, axis=2) + + h_region_mask = tissue_mask[:50, 20:] + e_region_mask = tissue_mask[50:, 20:] + + h_change = (np.mean(od_result[:50, 20:][h_region_mask]) - + np.mean(od_orig[:50, 20:][h_region_mask])) + e_change = (np.mean(od_result[50:, 20:][e_region_mask]) - + np.mean(od_orig[50:, 20:][e_region_mask])) + + # Check direction of changes + if expected_changes["h_change"] == "increase": + assert h_change > 0, "H stain should increase" + elif expected_changes["h_change"] == "decrease": + assert h_change < 0, "H stain should decrease" + else: # stable + np.testing.assert_allclose(h_change, 0, atol=0.15) + + if expected_changes["e_change"] == "increase": + assert e_change > 0, "E stain should increase" + elif expected_changes["e_change"] == "decrease": + assert e_change < 0, "E stain should decrease" + else: # stable + np.testing.assert_allclose(e_change, 0, atol=0.15) + + +def test_background_augmentation(synthetic_he_image): + """Test that background augmentation flag works correctly.""" + img, stain_matrix, _ = synthetic_he_image + + result = fmain.apply_he_stain_augmentation( + img=img, + stain_matrix=stain_matrix, + scale_factors=np.array([2.0, 2.0]), + shift_values=np.array([0.1, 0.1]), + augment_background=False + ) + + # Background region should be unchanged + np.testing.assert_allclose( + result[:20, :20], + img[:20, :20], + rtol=1e-5, + atol=1 + ) From b6eae68d85fe7226dfefd0505664d8db8bd71d9f Mon Sep 17 00:00:00 2001 From: Vladimir Iglovikov Date: Sun, 9 Feb 2025 14:01:59 -0800 Subject: [PATCH 3/7] Cleanup --- albumentations/augmentations/functional.py | 29 +++++++++------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/albumentations/augmentations/functional.py b/albumentations/augmentations/functional.py index 4efb0296f..4434bff61 100644 --- a/albumentations/augmentations/functional.py +++ b/albumentations/augmentations/functional.py @@ -3368,29 +3368,24 @@ def get_tissue_mask(img: np.ndarray, threshold: float = 0.85) -> np.ndarray: """Get binary mask of tissue regions based on luminosity. Args: - img: RGB image + img: RGB image in float32 format, range [0, 1] threshold: Luminosity threshold. Pixels with luminosity below this value are considered tissue. Range: 0 to 1. Default: 0.85 Returns: Binary mask where True indicates tissue regions """ - # Convert to grayscale using OpenCV's weighted formula - luminosity = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) - - # Apply threshold (cv2.THRESH_BINARY_INV because tissue is darker) - _, mask = cv2.threshold( - src=luminosity, - thresh=int(threshold * 255), - maxval=1, - type=cv2.THRESH_BINARY_INV, - ) + # Convert to grayscale using RGB weights: R*0.299 + G*0.587 + B*0.114 + luminosity = img[..., 0] * 0.299 + img[..., 1] * 0.587 + img[..., 2] * 0.114 + + # Tissue is darker, so we want pixels below threshold + mask = luminosity < threshold - return mask.reshape(-1).astype(bool) + return mask.reshape(-1) @clipped -@uint8_io +@float32_io def apply_he_stain_augmentation( img: np.ndarray, stain_matrix: np.ndarray, @@ -3399,9 +3394,9 @@ def apply_he_stain_augmentation( augment_background: bool, ) -> np.ndarray: # Step 1: Convert to optical density - pixel_matrix = img.reshape(-1, 3).astype(np.float32) - pixel_matrix = np.maximum(pixel_matrix, 1) - optical_density = -np.log(pixel_matrix / 255.0) + pixel_matrix = img.reshape(-1, 3) + pixel_matrix = np.maximum(pixel_matrix, 1e-6) + optical_density = -cv2.log(pixel_matrix) # Step 2: Calculate concentrations stain_matrix = np.ascontiguousarray(stain_matrix, dtype=np.float32) @@ -3420,6 +3415,6 @@ def apply_he_stain_augmentation( # Step 4: Reconstruct image od_result = concentrations @ stain_matrix - rgb_result = 255.0 * np.exp(-od_result) + rgb_result = np.exp(-od_result) return rgb_result.reshape(img.shape) From 807669a394a23a037a8b6152f65cc6fdbda0f867 Mon Sep 17 00:00:00 2001 From: Vladimir Iglovikov Date: Sun, 9 Feb 2025 15:45:12 -0800 Subject: [PATCH 4/7] Tests pass --- albumentations/augmentations/functional.py | 180 ++++++++++++--------- albumentations/augmentations/transforms.py | 60 +++---- tests/aug_definitions.py | 7 +- tests/functional/test_functional.py | 69 +------- tests/test_augmentations.py | 7 +- tests/test_core.py | 2 +- 6 files changed, 158 insertions(+), 167 deletions(-) diff --git a/albumentations/augmentations/functional.py b/albumentations/augmentations/functional.py index 4434bff61..49b352f97 100644 --- a/albumentations/augmentations/functional.py +++ b/albumentations/augmentations/functional.py @@ -3191,17 +3191,16 @@ def get_mud_params( } -def get_normalizer(method: Literal["vahadane", "macenko"], random_state: np.random.RandomState) -> StainNormalizer: +def get_normalizer(method: Literal["vahadane", "macenko"]) -> StainNormalizer: """Get stain normalizer based on method.""" - return VahadaneNormalizer(random_state) if method == "vahadane" else MacenkoNormalizer(random_state) + return VahadaneNormalizer() if method == "vahadane" else MacenkoNormalizer() class StainNormalizer: """Base class for stain normalizers.""" - def __init__(self, random_state: np.random.RandomState) -> None: + def __init__(self) -> None: self.stain_matrix_target = None - self.random_state = random_state def fit(self, img: np.ndarray) -> None: """Extract stain matrix from image.""" @@ -3209,81 +3208,96 @@ def fit(self, img: np.ndarray) -> None: class SimpleNMF: - """Simple Non-negative Matrix Factorization for H&E stain separation. - - This is a simplified implementation specifically for H&E staining, - where we know we want exactly 2 components (H&E) and have RGB data. - """ - - def __init__(self, random_state: np.random.RandomState, n_iter: int = 100): + def __init__(self, n_iter: int = 100): self.n_iter = n_iter - self.random_state = random_state + # Initialize with standard H&E colors from Ruifrok + self.initial_colors = np.array( + [ + [0.644211, 0.716556, 0.266844], # Hematoxylin + [0.092789, 0.954111, 0.283111], # Eosin + ], + dtype=np.float32, + ) def fit_transform(self, optical_density: np.ndarray) -> tuple[np.ndarray, np.ndarray]: - """Decompose optical density matrix into stain concentrations and colors. - - Args: - optical_density: Input matrix of shape (n_pixels, n_channels) - Optical density values of shape (n_pixels, 3) for RGB - - Returns: - stain_concentrations: Matrix of shape (n_pixels, 2) for H&E concentrations - stain_colors: Matrix of shape (2, 3) for RGB colors of each stain - """ - n_pixels, n_channels = optical_density.shape - n_stains = 2 # H&E stains + # Start with known H&E colors + stain_colors = self.initial_colors.copy() - # Initialize random matrices - stain_concentrations = self.random_state.uniform(0, 1, (n_pixels, n_stains)) - stain_colors = self.random_state.uniform(0, 1, (n_stains, n_channels)) + # Initialize concentrations based on projection onto initial colors + # This gives us a physically meaningful starting point + stain_colors_normalized = stain_colors / np.sqrt(np.sum(stain_colors**2, axis=1, keepdims=True)) + stain_concentrations = np.maximum(optical_density @ stain_colors_normalized.T, 0) - # Normalize stain colors to unit norm - stain_colors = stain_colors / np.sqrt(np.sum(stain_colors**2, axis=1, keepdims=True)) - - # Iterative update rules - eps = 1e-7 # Small number to prevent division by zero + # Iterative updates with careful normalization + eps = 1e-6 for _ in range(self.n_iter): # Update concentrations - conc_numerator = np.dot(optical_density, stain_colors.T) - conc_denominator = np.dot(stain_concentrations, np.dot(stain_colors, stain_colors.T)) + eps - stain_concentrations *= conc_numerator / conc_denominator + numerator = optical_density @ stain_colors.T + denominator = stain_concentrations @ (stain_colors @ stain_colors.T) + stain_concentrations *= numerator / (denominator + eps) + + # Ensure non-negativity + stain_concentrations = np.maximum(stain_concentrations, 0) # Update colors - color_numerator = np.dot(stain_concentrations.T, optical_density) - color_denominator = np.dot(np.dot(stain_concentrations.T, stain_concentrations), stain_colors) + eps - stain_colors *= color_numerator / color_denominator + numerator = stain_concentrations.T @ optical_density + denominator = (stain_concentrations.T @ stain_concentrations) @ stain_colors + stain_colors *= numerator / (denominator + eps) - # Normalize stain colors to unit norm - stain_colors = stain_colors / np.sqrt(np.sum(stain_colors**2, axis=1, keepdims=True)) + # Ensure non-negativity and normalize + stain_colors = np.maximum(stain_colors, 0) + stain_colors /= np.sqrt(np.sum(stain_colors**2, axis=1, keepdims=True)) return stain_concentrations, stain_colors -class VahadaneNormalizer(StainNormalizer): - """Vahadane stain normalizer using simplified NMF.""" +def order_stains_combined(stain_colors: np.ndarray) -> tuple[int, int]: + """Order stains using a combination of methods. + This combines both angular information and spectral characteristics + for more robust identification. + """ + # Normalize stain vectors + stain_colors = stain_colors / np.sqrt(np.sum(stain_colors**2, axis=1, keepdims=True)) + + # Calculate angles (Macenko) + angles = np.mod(np.arctan2(stain_colors[:, 1], stain_colors[:, 0]), np.pi) + + # Calculate spectral ratios (Ruifrok) + blue_ratio = stain_colors[:, 2] / (np.sum(stain_colors, axis=1) + 1e-6) + red_ratio = stain_colors[:, 0] / (np.sum(stain_colors, axis=1) + 1e-6) + + # Combine scores + # High angle and high blue ratio indicates Hematoxylin + # Low angle and high red ratio indicates Eosin + scores = angles * blue_ratio - red_ratio + + hematoxylin_idx = np.argmax(scores) + eosin_idx = 1 - hematoxylin_idx + + return hematoxylin_idx, eosin_idx + + +class VahadaneNormalizer(StainNormalizer): def fit(self, img: np.ndarray) -> None: - """Extract H&E stain matrix using Vahadane's method. + max_value = MAX_VALUES_BY_DTYPE[img.dtype] + + pixel_matrix = img.reshape((-1, 3)).astype(np.float32) + + pixel_matrix = np.maximum(pixel_matrix / max_value, 1e-6) - Args: - img: RGB image of shape (height, width, 3) - """ - # Convert to optical density - optical_density = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) + optical_density = -np.log(pixel_matrix) - # Apply NMF to get stain concentrations and colors - nmf = SimpleNMF(self.random_state, n_iter=100) + nmf = SimpleNMF(n_iter=100) _, stain_colors = nmf.fit_transform(optical_density) - # Order H&E components by their angles in the color plane - color_angles = np.arctan2(stain_colors[:, 1], stain_colors[:, 0]) - hematoxylin_idx = np.argmax(color_angles) - eosin_idx = 1 - hematoxylin_idx + # Use combined method for robust stain ordering + hematoxylin_idx, eosin_idx = order_stains_combined(stain_colors) self.stain_matrix_target = np.array( [ - stain_colors[hematoxylin_idx], # Hematoxylin stain color - stain_colors[eosin_idx], # Eosin stain color + stain_colors[hematoxylin_idx], + stain_colors[eosin_idx], ], ) @@ -3291,16 +3305,17 @@ def fit(self, img: np.ndarray) -> None: class MacenkoNormalizer(StainNormalizer): """Macenko stain normalizer with optimized computations.""" - def __init__(self, random_state: np.random.RandomState, angular_percentile: float = 99): - super().__init__(random_state) + def __init__(self, angular_percentile: float = 99): + super().__init__() self.angular_percentile = angular_percentile def fit(self, img: np.ndarray, angular_percentile: float = 99) -> None: """Extract H&E stain matrix using optimized Macenko's method.""" + max_value = MAX_VALUES_BY_DTYPE[img.dtype] # Step 1: Convert RGB to optical density (OD) space pixel_matrix = img.reshape(-1, 3).astype(np.float32) - pixel_matrix = np.maximum(pixel_matrix, 1) - optical_density = -np.log(pixel_matrix / 255.0) + pixel_matrix = np.maximum(pixel_matrix / max_value, 1e-6) + optical_density = -np.log(pixel_matrix) # Step 2: Remove background pixels od_threshold = 0.05 @@ -3393,28 +3408,41 @@ def apply_he_stain_augmentation( shift_values: np.ndarray, augment_background: bool, ) -> np.ndarray: - # Step 1: Convert to optical density - pixel_matrix = img.reshape(-1, 3) - pixel_matrix = np.maximum(pixel_matrix, 1e-6) - optical_density = -cv2.log(pixel_matrix) + # Step 1: Convert RGB to optical density space + rgb_pixels = img.reshape(-1, 3) + rgb_pixels_clipped = np.maximum(rgb_pixels, 1e-6) # Prevent log(0) + optical_density = -np.log(rgb_pixels_clipped) - # Step 2: Calculate concentrations + # Step 2: Calculate stain concentrations using regularized pseudo-inverse stain_matrix = np.ascontiguousarray(stain_matrix, dtype=np.float32) - concentrations = np.linalg.solve( - stain_matrix @ stain_matrix.T, - stain_matrix @ optical_density.T, - ).T - # Step 3: Apply scaling and shifting + # Add small regularization term for numerical stability + regularization = 1e-6 + stain_correlation = stain_matrix @ stain_matrix.T + regularization * np.eye(2) + density_projection = stain_matrix @ optical_density.T + + try: + # Solve for stain concentrations + stain_concentrations = np.linalg.solve(stain_correlation, density_projection).T + except np.linalg.LinAlgError: + # Fallback to pseudo-inverse if direct solve fails + stain_concentrations = np.linalg.lstsq( + stain_matrix.T, + optical_density, + rcond=regularization, + )[0].T + + # Step 3: Apply concentration adjustments if not augment_background: + # Only modify tissue regions tissue_mask = get_tissue_mask(img).reshape(-1) - concentrations[tissue_mask] = concentrations[tissue_mask] * scale_factors + shift_values + stain_concentrations[tissue_mask] = stain_concentrations[tissue_mask] * scale_factors + shift_values else: - # Use numpy operations instead of cv2 - concentrations = concentrations * scale_factors + shift_values + # Modify all pixels + stain_concentrations = stain_concentrations * scale_factors + shift_values - # Step 4: Reconstruct image - od_result = concentrations @ stain_matrix - rgb_result = np.exp(-od_result) + # Step 4: Reconstruct RGB image + optical_density_result = stain_concentrations @ stain_matrix + rgb_result = np.exp(-optical_density_result) return rgb_result.reshape(img.shape) diff --git a/albumentations/augmentations/transforms.py b/albumentations/augmentations/transforms.py index de68cb78d..89ea0e28d 100644 --- a/albumentations/augmentations/transforms.py +++ b/albumentations/augmentations/transforms.py @@ -6501,7 +6501,6 @@ class HEStain(ImageOnlyTransform): - "random_preset": Randomly select a preset matrix each time - "vahadane": Extract using Vahadane method - "macenko": Extract using Macenko method - - "custom": Use custom provided matrix Default: "preset" preset: Preset stain matrix to use when method="preset": @@ -6515,9 +6514,6 @@ class HEStain(ImageOnlyTransform): - "light": Lighter staining Default: "standard" - custom_matrix: Custom stain matrix to use when method="custom". - Shape should be (2, 3) for H&E stains. - intensity_scale_range: Range for multiplicative stain intensity variation. Values are multipliers between 0.5 and 1.5. For example: - (0.7, 1.3) means stain intensities will vary from 70% to 130% @@ -6538,6 +6534,9 @@ class HEStain(ImageOnlyTransform): Targets: image + Number of channels: + 3 + Image types: uint8, float32 @@ -6551,7 +6550,7 @@ class HEStain(ImageOnlyTransform): """ class InitSchema(BaseTransformInitSchema): - method: Literal["preset", "random_preset", "vahadane", "macenko", "custom"] + method: Literal["preset", "random_preset", "vahadane", "macenko"] preset: ( Literal[ "ruifrok", @@ -6565,7 +6564,6 @@ class InitSchema(BaseTransformInitSchema): ] | None ) - custom_matrix: np.ndarray | None intensity_scale_range: Annotated[ tuple[float, float], AfterValidator(nondecreasing), @@ -6584,15 +6582,11 @@ def validate_matrix_selection(self) -> Self: self.preset = "standard" elif self.method == "random_preset" and self.preset is not None: raise ValueError("preset should not be specified when method='random_preset'") - elif self.method == "custom" and self.custom_matrix is None: - raise ValueError("custom_matrix must be provided when method='custom'") - elif self.method == "custom" and self.custom_matrix is not None and self.custom_matrix.shape != (2, 3): - raise ValueError("custom_matrix must have shape (2, 3)") return self def __init__( self, - method: Literal["preset", "random_preset", "vahadane", "macenko", "custom"] = "random_preset", + method: Literal["preset", "random_preset", "vahadane", "macenko"] = "random_preset", preset: Literal[ "ruifrok", "macenko", @@ -6602,8 +6596,8 @@ def __init__( "e_heavy", "dark", "light", - ] = "standard", - custom_matrix: np.ndarray | None = None, + ] + | None = None, intensity_scale_range: tuple[float, float] = (0.7, 1.3), intensity_shift_range: tuple[float, float] = (-0.2, 0.2), augment_background: bool = False, @@ -6612,15 +6606,15 @@ def __init__( super().__init__(p=p) self.method = method self.preset = preset - self.custom_matrix = custom_matrix self.intensity_scale_range = intensity_scale_range self.intensity_shift_range = intensity_shift_range self.augment_background = augment_background + self.stain_normalizer = None + # Initialize stain extractor here if needed if method in ["vahadane", "macenko"]: self.stain_extractor = fmain.get_normalizer( cast(Literal["vahadane", "macenko"], method), - self.random_generator, ) self.preset_names = [ @@ -6636,13 +6630,11 @@ def __init__( def get_stain_matrix(self, img: np.ndarray) -> np.ndarray: """Get stain matrix based on selected method.""" - if self.method == "preset": + if self.method == "preset" and self.preset is not None: return fmain.STAIN_MATRICES[self.preset] if self.method == "random_preset": random_preset = self.py_random.choice(self.preset_names) return fmain.STAIN_MATRICES[random_preset] - if self.method == "custom": - return self.custom_matrix # vahadane or macenko self.stain_extractor.fit(img) return self.stain_extractor.stain_matrix_target @@ -6655,6 +6647,7 @@ def apply( shift_values: np.ndarray, **params: Any, ) -> np.ndarray: + non_rgb_error(img) return fmain.apply_he_stain_augmentation( img=img, stain_matrix=stain_matrix, @@ -6663,9 +6656,22 @@ def apply( augment_background=self.augment_background, ) + @batch_transform("channel", has_batch_dim=True, has_depth_dim=False) + def apply_to_images(self, images: np.ndarray, **params: Any) -> np.ndarray: + return self.apply(images, **params) + + @batch_transform("channel", has_batch_dim=False, has_depth_dim=True) + def apply_to_volume(self, volume: np.ndarray, **params: Any) -> np.ndarray: + return self.apply(volume, **params) + + @batch_transform("channel", has_batch_dim=True, has_depth_dim=True) + def apply_to_volumes(self, volumes: np.ndarray, **params: Any) -> np.ndarray: + return self.apply(volumes, **params) + def get_params_dependent_on_data(self, params: dict[str, Any], data: dict[str, Any]) -> dict[str, Any]: # Get stain matrix image = data["image"] if "image" in data else data["images"][0] + stain_matrix = self.get_stain_matrix(image) # Generate random scaling and shift parameters for both H&E channels @@ -6688,12 +6694,12 @@ def get_params_dependent_on_data(self, params: dict[str, Any], data: dict[str, A "shift_values": shift_values, } - def get_transform_init_args_names(self) -> tuple[str, ...]: - return ( - "method", - "preset", - "custom_matrix", - "intensity_scale_range", - "intensity_shift_range", - "augment_background", - ) + def get_transform_init_args(self) -> dict[str, Any]: + """Return dictionary with transform init arguments.""" + return { + "method": self.method, + "preset": self.preset, + "intensity_scale_range": self.intensity_scale_range, + "intensity_shift_range": self.intensity_shift_range, + "augment_background": self.augment_background, + } diff --git a/tests/aug_definitions.py b/tests/aug_definitions.py index 4a6f49a63..addbfe543 100644 --- a/tests/aug_definitions.py +++ b/tests/aug_definitions.py @@ -432,5 +432,10 @@ [A.AtLeastOneBBoxRandomCrop, {"height": 80, "width": 80, "erosion_factor": 0.2}], [A.ConstrainedCoarseDropout, {"num_holes_range": (1, 3), "hole_height_range": (0.1, 0.2), "hole_width_range": (0.1, 0.2), "fill": 0, "fill_mask": 0, "mask_indices": [1]}], [A.RandomSizedBBoxSafeCrop, {"height": 80, "width": 80, "erosion_rate": 0.2}], - [A.HEStain, {"method": "vahadane", "intensity_scale_range": (0.5, 1.5), "intensity_shift_range": (-0.1, 0.1), "augment_background": True}], + [A.HEStain, [ + {"method": "vahadane", "intensity_scale_range": (0.5, 1.5), "intensity_shift_range": (-0.1, 0.1), "augment_background": True}, + {"method": "macenko", "intensity_scale_range": (0.5, 1.5), "intensity_shift_range": (-0.1, 0.1), "augment_background": True}, + {"method": "random_preset", + "intensity_scale_range": (0.5, 1.5), "intensity_shift_range": (-0.1, 0.1), "augment_background": True}, + ]], ] diff --git a/tests/functional/test_functional.py b/tests/functional/test_functional.py index 8c0f5ef72..a2bfcab35 100644 --- a/tests/functional/test_functional.py +++ b/tests/functional/test_functional.py @@ -2743,7 +2743,7 @@ def test_simple_nmf_shape(height, width, n_iter, random_state): od = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) # Our implementation - nmf = fmain.SimpleNMF(random_state=random_state, n_iter=n_iter) + nmf = fmain.SimpleNMF(n_iter=n_iter) concentrations, colors = nmf.fit_transform(od) # Check shapes @@ -2774,7 +2774,7 @@ def test_simple_nmf_against_sklearn(height, width, random_state): od = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) # Our implementation - our_nmf = fmain.SimpleNMF(random_state=random_state, n_iter=100) + our_nmf = fmain.SimpleNMF(n_iter=100) our_concentrations, our_colors = our_nmf.fit_transform(od) # Sklearn implementation @@ -2798,22 +2798,6 @@ def test_simple_nmf_against_sklearn(height, width, random_state): assert our_error <= 2 * sklearn_error -def test_simple_nmf_reproducibility(random_state): - # Test that same random state produces same results - img = np.random.randint(0, 256, (100, 100, 3), dtype=np.uint8) - od = -np.log((img.reshape((-1, 3)).astype(np.float32) + 1) / 256) - - nmf1 = fmain.SimpleNMF(random_state=np.random.RandomState(42)) - nmf2 = fmain.SimpleNMF(random_state=np.random.RandomState(42)) - - result1 = nmf1.fit_transform(od) - result2 = nmf2.fit_transform(od) - - np.testing.assert_allclose(result1[0], result2[0]) - np.testing.assert_allclose(result1[1], result2[1]) - - - @pytest.fixture def synthetic_he_image(): """Create synthetic H&E-like image with known stain vectors.""" @@ -2849,8 +2833,8 @@ def synthetic_he_image(): @pytest.mark.parametrize( ["normalizer_class", "kwargs"], [ - (fmain.VahadaneNormalizer, {"random_state": np.random.RandomState(42)}), - (fmain.MacenkoNormalizer, {"random_state": np.random.RandomState(42), "angular_percentile": 99}), + (fmain.VahadaneNormalizer, {}), + (fmain.MacenkoNormalizer, { "angular_percentile": 99 }), ] ) def test_normalizer_output_shape(normalizer_class, kwargs, synthetic_he_image): @@ -2867,47 +2851,11 @@ def test_normalizer_output_shape(normalizer_class, kwargs, synthetic_he_image): rtol=1e-5 ) -@pytest.mark.parametrize( - ["normalizer_class", "kwargs"], - [ - (fmain.VahadaneNormalizer, {"random_state": np.random.RandomState(137)}), - (fmain.MacenkoNormalizer, { - "random_state": np.random.RandomState(137), - "angular_percentile": 99 - }), - ] -) -def test_normalizer_consistency(normalizer_class, kwargs, synthetic_he_image): - """Test that normalizers produce consistent results.""" - img, stain_matrix, _ = synthetic_he_image # Unpack 3 values - - # Create two normalizers with SAME random state - rs1 = np.random.RandomState(137) # Explicit seed - rs2 = np.random.RandomState(137) # Same seed - - kwargs1 = {**kwargs, "random_state": rs1} - kwargs2 = {**kwargs, "random_state": rs2} - - normalizer1 = normalizer_class(**kwargs1) - normalizer2 = normalizer_class(**kwargs2) - - normalizer1.fit(img) - normalizer2.fit(img) - - np.testing.assert_allclose( - normalizer1.stain_matrix_target, - normalizer2.stain_matrix_target, - rtol=1e-5 - ) - @pytest.mark.parametrize( ["normalizer_class", "kwargs", "angle_tolerance"], [ - (fmain.VahadaneNormalizer, {"random_state": np.random.RandomState(42)}, 45), - (fmain.MacenkoNormalizer, { - "random_state": np.random.RandomState(42), - "angular_percentile": 99 - }, 45), + (fmain.VahadaneNormalizer, {}, 45), + (fmain.MacenkoNormalizer, { "angular_percentile": 99 }, 45), ] ) def test_normalizer_stain_separation(normalizer_class, kwargs, angle_tolerance, synthetic_he_image): @@ -2932,9 +2880,8 @@ def test_normalizer_stain_separation(normalizer_class, kwargs, angle_tolerance, ) def test_macenko_angular_percentile(angular_percentile, synthetic_he_image): """Test MacenkoNormalizer with different angular percentiles.""" - img = synthetic_he_image[0] # Unpack 3 values + img = synthetic_he_image[0] normalizer = fmain.MacenkoNormalizer( - random_state=np.random.RandomState(42), angular_percentile=angular_percentile ) normalizer.fit(img) @@ -2962,7 +2909,7 @@ def test_macenko_angular_percentile(angular_percentile, synthetic_he_image): ) def test_stain_augmentation_effects(synthetic_he_image, scale_factors, shift_values, expected_changes): """Test that augmentation produces expected changes in stain intensities.""" - img, stain_matrix, original_concentrations = synthetic_he_image + img, stain_matrix = synthetic_he_image[:2] # Ensure stain_matrix is float32 stain_matrix = stain_matrix.astype(np.float32) diff --git a/tests/test_augmentations.py b/tests/test_augmentations.py index 7b95e74e6..3795d247d 100644 --- a/tests/test_augmentations.py +++ b/tests/test_augmentations.py @@ -319,7 +319,8 @@ def test_augmentations_wont_change_float_input(augmentation_cls, params): A.RandomGravel, A.RandomSunFlare, A.RandomFog, - A.Pad + A.Pad, + A.HEStain, }, ), ) @@ -515,6 +516,7 @@ def test_mask_fill_value(augmentation_cls, params): A.RandomFog, A.Equalize, A.GridElasticDeform, + A.HEStain, }, ), ) @@ -591,6 +593,7 @@ def test_multichannel_image_augmentations(augmentation_cls, params): A.RandomSunFlare, A.RandomFog, A.GridElasticDeform, + A.HEStain, }, ), ) @@ -664,6 +667,7 @@ def test_float_multichannel_image_augmentations(augmentation_cls, params): A.RandomFog, A.Equalize, A.GridElasticDeform, + A.HEStain, }, ), ) @@ -740,6 +744,7 @@ def test_multichannel_image_augmentations_diff_channels(augmentation_cls, params A.RandomSunFlare, A.RandomFog, A.GridElasticDeform, + A.HEStain, }, ), ) diff --git a/tests/test_core.py b/tests/test_core.py index 4d03955e1..25ebee2a6 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1038,7 +1038,7 @@ def test_images_as_target(augmentation_cls, params, as_array, shape): if augmentation_cls in {A.ChannelDropout, A.Spatter, A.ISONoise, A.RandomGravel, A.ChromaticAberration, A.PlanckianJitter, A.PixelDistributionAdaptation, A.MaskDropout, A.ConstrainedCoarseDropout, A.ChannelShuffle, A.ToRGB, A.RandomSunFlare, - A.RandomFog, A.RandomSnow, A.RandomRain}: + A.RandomFog, A.RandomSnow, A.RandomRain, A.HEStain}: pytest.skip("ChannelDropout is not applicable to grayscale images") From 657aad13a58a74efc20cd8d608f5644035de440d Mon Sep 17 00:00:00 2001 From: Vladimir Iglovikov Date: Sun, 9 Feb 2025 16:00:02 -0800 Subject: [PATCH 5/7] Added sklearn to tests --- tests/aug_definitions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/aug_definitions.py b/tests/aug_definitions.py index addbfe543..3e0105e51 100644 --- a/tests/aug_definitions.py +++ b/tests/aug_definitions.py @@ -433,7 +433,7 @@ [A.ConstrainedCoarseDropout, {"num_holes_range": (1, 3), "hole_height_range": (0.1, 0.2), "hole_width_range": (0.1, 0.2), "fill": 0, "fill_mask": 0, "mask_indices": [1]}], [A.RandomSizedBBoxSafeCrop, {"height": 80, "width": 80, "erosion_rate": 0.2}], [A.HEStain, [ - {"method": "vahadane", "intensity_scale_range": (0.5, 1.5), "intensity_shift_range": (-0.1, 0.1), "augment_background": True}, + {"method": "vahadane", "intensity_scale_range": (0.5, 1.5), "intensity_shift_range": (-0.1, 0.1), "augment_background": False}, {"method": "macenko", "intensity_scale_range": (0.5, 1.5), "intensity_shift_range": (-0.1, 0.1), "augment_background": True}, {"method": "random_preset", "intensity_scale_range": (0.5, 1.5), "intensity_shift_range": (-0.1, 0.1), "augment_background": True}, From 490df6e57d4cd7589ebf1f883cae145dfd123a1b Mon Sep 17 00:00:00 2001 From: Vladimir Iglovikov Date: Sun, 9 Feb 2025 16:07:05 -0800 Subject: [PATCH 6/7] Refactoring --- albumentations/augmentations/functional.py | 35 +++++++++++----------- requirements-dev.txt | 1 + 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/albumentations/augmentations/functional.py b/albumentations/augmentations/functional.py index 49b352f97..bfb7ae196 100644 --- a/albumentations/augmentations/functional.py +++ b/albumentations/augmentations/functional.py @@ -3191,6 +3191,18 @@ def get_mud_params( } +def rgb_to_optical_density(img: np.ndarray, eps: float = 1e-6) -> np.ndarray: + max_value = MAX_VALUES_BY_DTYPE[img.dtype] + pixel_matrix = img.reshape(-1, 3).astype(np.float32) + pixel_matrix = np.maximum(pixel_matrix / max_value, eps) + return -np.log(pixel_matrix) + + +def normalize_vectors(vectors: np.ndarray) -> np.ndarray: + norms = np.sqrt(np.sum(vectors**2, axis=1, keepdims=True)) + return vectors / norms + + def get_normalizer(method: Literal["vahadane", "macenko"]) -> StainNormalizer: """Get stain normalizer based on method.""" return VahadaneNormalizer() if method == "vahadane" else MacenkoNormalizer() @@ -3225,7 +3237,7 @@ def fit_transform(self, optical_density: np.ndarray) -> tuple[np.ndarray, np.nda # Initialize concentrations based on projection onto initial colors # This gives us a physically meaningful starting point - stain_colors_normalized = stain_colors / np.sqrt(np.sum(stain_colors**2, axis=1, keepdims=True)) + stain_colors_normalized = normalize_vectors(stain_colors) stain_concentrations = np.maximum(optical_density @ stain_colors_normalized.T, 0) # Iterative updates with careful normalization @@ -3246,7 +3258,7 @@ def fit_transform(self, optical_density: np.ndarray) -> tuple[np.ndarray, np.nda # Ensure non-negativity and normalize stain_colors = np.maximum(stain_colors, 0) - stain_colors /= np.sqrt(np.sum(stain_colors**2, axis=1, keepdims=True)) + stain_colors = normalize_vectors(stain_colors) return stain_concentrations, stain_colors @@ -3258,7 +3270,7 @@ def order_stains_combined(stain_colors: np.ndarray) -> tuple[int, int]: for more robust identification. """ # Normalize stain vectors - stain_colors = stain_colors / np.sqrt(np.sum(stain_colors**2, axis=1, keepdims=True)) + stain_colors = normalize_vectors(stain_colors) # Calculate angles (Macenko) angles = np.mod(np.arctan2(stain_colors[:, 1], stain_colors[:, 0]), np.pi) @@ -3280,13 +3292,7 @@ def order_stains_combined(stain_colors: np.ndarray) -> tuple[int, int]: class VahadaneNormalizer(StainNormalizer): def fit(self, img: np.ndarray) -> None: - max_value = MAX_VALUES_BY_DTYPE[img.dtype] - - pixel_matrix = img.reshape((-1, 3)).astype(np.float32) - - pixel_matrix = np.maximum(pixel_matrix / max_value, 1e-6) - - optical_density = -np.log(pixel_matrix) + optical_density = rgb_to_optical_density(img) nmf = SimpleNMF(n_iter=100) _, stain_colors = nmf.fit_transform(optical_density) @@ -3311,11 +3317,8 @@ def __init__(self, angular_percentile: float = 99): def fit(self, img: np.ndarray, angular_percentile: float = 99) -> None: """Extract H&E stain matrix using optimized Macenko's method.""" - max_value = MAX_VALUES_BY_DTYPE[img.dtype] # Step 1: Convert RGB to optical density (OD) space - pixel_matrix = img.reshape(-1, 3).astype(np.float32) - pixel_matrix = np.maximum(pixel_matrix / max_value, 1e-6) - optical_density = -np.log(pixel_matrix) + optical_density = rgb_to_optical_density(img) # Step 2: Remove background pixels od_threshold = 0.05 @@ -3409,9 +3412,7 @@ def apply_he_stain_augmentation( augment_background: bool, ) -> np.ndarray: # Step 1: Convert RGB to optical density space - rgb_pixels = img.reshape(-1, 3) - rgb_pixels_clipped = np.maximum(rgb_pixels, 1e-6) # Prevent log(0) - optical_density = -np.log(rgb_pixels_clipped) + optical_density = rgb_to_optical_density(img) # Step 2: Calculate stain concentrations using regularized pseudo-inverse stain_matrix = np.ascontiguousarray(stain_matrix, dtype=np.float32) diff --git a/requirements-dev.txt b/requirements-dev.txt index af219c02b..5d6ff128b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -6,6 +6,7 @@ pytest_cov>=5.0.0 pytest_mock>=3.14.0 requests>=2.31.0 scikit-image +scikit-learn tomli>=2.0.1 torch>=2.3.1 torchvision>=0.18.1 From 8891cb5e55138cd069236bf9f1656a1c3f04ddef Mon Sep 17 00:00:00 2001 From: Vladimir Iglovikov Date: Sun, 9 Feb 2025 16:10:15 -0800 Subject: [PATCH 7/7] Added to Reame --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 0e2b03e43..e6c2ee67d 100644 --- a/README.md +++ b/README.md @@ -204,6 +204,7 @@ Pixel-level transforms will change just an input image and will leave any additi - [GaussNoise](https://explore.albumentations.ai/transform/GaussNoise) - [GaussianBlur](https://explore.albumentations.ai/transform/GaussianBlur) - [GlassBlur](https://explore.albumentations.ai/transform/GlassBlur) +- [HEStain](https://explore.albumentations.ai/transform/HEStain) - [HistogramMatching](https://explore.albumentations.ai/transform/HistogramMatching) - [HueSaturationValue](https://explore.albumentations.ai/transform/HueSaturationValue) - [ISONoise](https://explore.albumentations.ai/transform/ISONoise)