Image Processing with ImSwitch
ImSwitch can be used to connect image processing and hardware control. There are multiple ways to do that. Here, we focus on the controller-way, which is the native way to have access on the hardware directly and process incoming frames using standard Python libraries.
For this we take an exemplary Controller, the Autofocus Controller, that takes frames and processes them to compute the sharpest plane :
Image Processing with ImSwitch
ImSwitch provides powerful integration between image processing and hardware control. This tutorial shows you how to create custom controllers that process live camera feeds and control hardware in real-time.
Overview
ImSwitch offers multiple ways to integrate image processing:
- Controller-based Processing: Native ImSwitch controllers with direct hardware access
- Plugin-based Processing: Modular processing plugins
- External Integration: Using ImSwitch API from external processing tools
- Napari Integration: Advanced image analysis workflows
Autofocus Controller Tutorial
Based on the AutofocusController, here's how to create image processing controllers:
Basic Controller Structure
from imswitch import IS_HEADLESS
import time
import numpy as np
import threading
from imswitch.imcommon.model import initLogger, APIExport
from ..basecontrollers import ImConWidgetController
from skimage.filters import gaussian
from imswitch.imcommon.framework import Signal
import cv2
class AutofocusController(ImConWidgetController):
"""Custom controller for autofocus functionality."""
# Signals for updating GUI
sigUpdateFocusPlot = Signal(object, object)
sigUpdateFocusValue = Signal(object)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__logger = initLogger(self)
self.isAutofocusRunning = False
# Get hardware managers
self.setupHardware()
def setupHardware(self):
"""Initialize hardware connections"""
# Get camera (detector)
if self._setupInfo.autofocus is not None:
self.cameraName = self._setupInfo.autofocus.camera
self.stageName = self._setupInfo.autofocus.positioner
else:
# Use first available devices
self.cameraName = self._master.detectorsManager.getAllDeviceNames()[0]
self.stageName = self._master.positionersManager.getAllDeviceNames()[0]
# Get hardware managers
self.camera = self._master.detectorsManager[self.cameraName]
self.stage = self._master.positionersManager[self.stageName]
# Get laser manager (optional)
if self._master.lasersManager.getAllDeviceNames():
self.laserName = self._master.lasersManager.getAllDeviceNames()[0]
self.laser = self._master.lasersManager[self.laserName]
else:
self.laser = None
Hardware Access Examples
Camera Control
def captureImage(self):
"""Capture single image from camera"""
# Start acquisition if not running
if not self.camera.acquisition:
self.camera.startAcquisition()
# Capture frame
frame = self.camera.getLatestFrame()
# Convert to numpy array if needed
if hasattr(frame, 'getData'):
image = np.array(frame.getData())
else:
image = np.array(frame)
return image
def setCameraParameters(self, exposure_time=100, gain=1.0):
"""Set camera parameters"""
if hasattr(self.camera, 'setParameter'):
self.camera.setParameter('ExposureTime', exposure_time)
self.camera.setParameter('Gain', gain)
Stage Control
def moveStage(self, axis, position, relative=False):
"""Move stage to position"""
if relative:
current_pos = self.stage.position[axis]
target_pos = current_pos + position
else:
target_pos = position
# Move stage
self.stage.move(axis, target_pos, absolute=not relative)
# Wait for movement to complete
time.sleep(0.1)
while self.stage.isMoving(axis):
time.sleep(0.01)
def getStagePosition(self, axis):
"""Get current stage position"""
return self.stage.position[axis]
Laser/LED Control
def setIllumination(self, enabled, power=50):
"""Control illumination"""
if self.laser is not None:
if enabled:
self.laser.setValue(power) # Set power (0-100%)
else:
self.laser.setValue(0) # Turn off
# Or control via LED manager
if hasattr(self._master, 'ledsManager'):
led_manager = self._master.ledsManager
if led_manager.getAllDeviceNames():
led = led_manager[led_manager.getAllDeviceNames()[0]]
led.setValue(power if enabled else 0)
Autofocus Algorithm Implementation
def calculateFocusMetric(self, image):
"""Calculate focus metric from image"""
# Convert to grayscale if needed
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
else:
gray = image
# Apply Gaussian blur to reduce noise
blurred = gaussian(gray, sigma=1)
# Calculate focus metrics
# Method 1: Variance of Laplacian
laplacian = cv2.Laplacian(blurred, cv2.CV_64F)
focus_score = laplacian.var()
# Method 2: Sobel gradient magnitude
# sobelx = cv2.Sobel(blurred, cv2.CV_64F, 1, 0, ksize=3)
# sobely = cv2.Sobel(blurred, cv2.CV_64F, 0, 1, ksize=3)
# focus_score = np.mean(sobelx**2 + sobely**2)
return focus_score
def runAutofocus(self, z_start, z_end, z_step=10, axis='Z'):
"""Run autofocus scan"""
if self.isAutofocusRunning:
return
self.isAutofocusRunning = True
try:
# Generate Z positions
z_positions = np.arange(z_start, z_end + z_step, z_step)
focus_scores = []
# Turn on illumination
self.setIllumination(True, power=50)
time.sleep(0.1) # Wait for illumination to stabilize
for z_pos in z_positions:
# Move to Z position
self.moveStage(axis, z_pos, relative=False)
# Wait for settling
time.sleep(0.1)
# Capture image
image = self.captureImage()
# Calculate focus score
score = self.calculateFocusMetric(image)
focus_scores.append(score)
# Update GUI (if available)
self.sigUpdateFocusValue.emit(score)
# Log progress
self.__logger.info(f"Z={z_pos:.1f}, Focus Score={score:.2f}")
# Find best focus position
best_idx = np.argmax(focus_scores)
best_z = z_positions[best_idx]
best_score = focus_scores[best_idx]
# Move to best position
self.moveStage(axis, best_z, relative=False)
# Turn off illumination
self.setIllumination(False)
# Update plot
self.sigUpdateFocusPlot.emit(z_positions, focus_scores)
self.__logger.info(f"Autofocus complete. Best Z: {best_z:.1f}, Score: {best_score:.2f}")
return best_z, best_score
finally:
self.isAutofocusRunning = False
Real-time Processing Example
def startLiveProcessing(self):
"""Start live image processing"""
def processing_loop():
while self.isProcessingActive:
try:
# Capture frame
image = self.captureImage()
# Process image
processed = self.processImage(image)
# Make control decisions
self.makeControlDecisions(processed)
# Small delay to prevent overwhelming the system
time.sleep(0.01)
except Exception as e:
self.__logger.error(f"Processing error: {e}")
self.isProcessingActive = True
self.processing_thread = threading.Thread(target=processing_loop)
self.processing_thread.daemon = True
self.processing_thread.start()
def processImage(self, image):
"""Custom image processing pipeline"""
# Example: Edge detection and analysis
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
edges = cv2.Canny(gray, 50, 150)
# Find contours
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
return {
'original': image,
'edges': edges,
'contours': contours,
'num_objects': len(contours)
}
def makeControlDecisions(self, processed_data):
"""Make hardware control decisions based on image analysis"""
num_objects = processed_data['num_objects']
# Example: Adjust illumination based on object count
if num_objects < 5:
# Too few objects - increase illumination
self.setIllumination(True, power=min(100, self.current_power + 10))
elif num_objects > 20:
# Too many objects - decrease illumination
self.setIllumination(True, power=max(10, self.current_power - 10))
Integration with ImSwitch API
@APIExport
def runAutofocusAPI(self, z_start: float, z_end: float, z_step: float = 10):
"""API endpoint for autofocus"""
try:
best_z, score = self.runAutofocus(z_start, z_end, z_step)
return {
'success': True,
'best_z_position': best_z,
'focus_score': score
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
@APIExport
def getFocusScore(self):
"""API endpoint to get current focus score"""
try:
image = self.captureImage()
score = self.calculateFocusMetric(image)
return {
'success': True,
'focus_score': score
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
General Hardware Access Patterns
Detector (Camera) Access
# Get all available cameras
camera_names = self._master.detectorsManager.getAllDeviceNames()
# Access specific camera
camera = self._master.detectorsManager[camera_name]
# Start/stop acquisition
camera.startAcquisition()
camera.stopAcquisition()
# Get latest frame
frame = camera.getLatestFrame()
Positioner (Stage) Access
# Get all available stages
stage_names = self._master.positionersManager.getAllDeviceNames()
# Access specific stage
stage = self._master.positionersManager[stage_name]
# Move stage
stage.move(axis='X', dist=100, absolute=True)
# Get position
position = stage.position['X']
Laser/LED Access
# Get all available lasers
laser_names = self._master.lasersManager.getAllDeviceNames()
# Access specific laser
laser = self._master.lasersManager[laser_name]
# Set power (0-100%)
laser.setValue(50)
# Enable/disable
laser.setEnabled(True)
This tutorial provides the foundation for creating sophisticated image processing controllers in ImSwitch with full hardware integration.
self._commChannel.sigAutoFocus.connect(self.autoFocus)
if not IS_HEADLESS:
self._widget.focusButton.clicked.connect(self.focusButton)
def __del__(self):
self._AutofocusThead.quit()
self._AutofocusThead.wait()
if hasattr(super(), '__del__'):
super().__del__()
def focusButton(self):
if not self.isAutofusRunning:
rangez = float(self._widget.zStepRangeEdit.text())
resolutionz = float(self._widget.zStepSizeEdit.text())
defocusz = float(self._widget.zBackgroundDefocusEdit.text())
self._widget.focusButton.setText('Stop')
self.autoFocus(rangez, resolutionz, defocusz)
else:
self.isAutofusRunning = False
@APIExport(runOnUIThread=True)
def autoFocus(self, rangez:int=100, resolutionz:int=10, defocusz:int=0):
self.isAutofusRunning = True
self._AutofocusThead = threading.Thread(
target=self.doAutofocusBackground,
args=(rangez, resolutionz, defocusz),
daemon=True
)
self._AutofocusThead.start()
@APIExport(runOnUIThread=True)
def stopAutofocus(self):
self.isAutofusRunning = False
def grabCameraFrame(self):
return self.camera.getLatestFrame()
def recordFlatfield(self, nFrames=10, nGauss=16, defocusPosition=200, defocusAxis="Z"):
flatfield = []
posStart = self.stages.getPosition()[defocusAxis]
time.sleep(1)
self.stages.move(value=defocusPosition, axis=defocusAxis, is_absolute=False, is_blocking=True)
for _ in range(nFrames):
flatfield.append(self.grabCameraFrame())
flatfield = np.mean(np.array(flatfield), 0)
flatfield = gaussian(flatfield, sigma=nGauss)
self.stages.move(value=-defocusPosition, axis=defocusAxis, is_absolute=False, is_blocking=True)
time.sleep(1)
return flatfield
def doAutofocusBackground(self, rangez=100, resolutionz=10, defocusz=0):
self._commChannel.sigAutoFocusRunning.emit(True)
mProcessor = FrameProcessor()
if defocusz != 0:
flatfieldImage = self.recordFlatfield(defocusPosition=defocusz)
mProcessor.setFlatfieldFrame(flatfieldImage)
initialPosition = self.stages.getPosition()["Z"]
Nz = int(2 * rangez // resolutionz)
relative_positions = np.int32(np.linspace(-abs(rangez), abs(rangez), Nz))
# Move to the first relative position
self.stages.move(value=relative_positions[0], axis="Z", is_absolute=False, is_blocking=True)
for iz in range(Nz):
if not self.isAutofusRunning:
break
if iz != 0:
step = relative_positions[iz] - relative_positions[iz - 1]
self.stages.move(value=step, axis="Z", is_absolute=False, is_blocking=True)
frame = self.grabCameraFrame()
mProcessor.add_frame(frame, iz)
allfocusvals = np.array(mProcessor.getFocusValueList(Nz))
mProcessor.stop()
if self.isAutofusRunning:
coordinates = relative_positions + initialPosition
if not IS_HEADLESS:
self._widget.focusPlotCurve.setData(coordinates[:len(allfocusvals)], allfocusvals)
else:
self.sigUpdateFocusPlot.emit(coordinates[:len(allfocusvals)], allfocusvals)
best_index = np.argmax(allfocusvals)
bestzpos_rel = relative_positions[best_index]
# Move to best focus
self.stages.move(value=-2 * rangez, axis="Z", is_absolute=False, is_blocking=True)
self.stages.move(value=(rangez + bestzpos_rel), axis="Z", is_absolute=False, is_blocking=True)
else:
# Return to initial absolute position if stopped
self.stages.move(value=initialPosition, axis="Z", is_absolute=True, is_blocking=True)
self._commChannel.sigAutoFocusRunning.emit(False)
self.isAutofusRunning = False
if not IS_HEADLESS:
self._widget.focusButton.setText('Autofocus')
final_z = bestzpos_rel + initialPosition if self.isAutofusRunning else initialPosition
self.sigUpdateFocusValue.emit({"bestzpos": final_z})
return final_z
class FrameProcessor: def init(self, nGauss=7, nCropsize=2048): self.isRunning = True self.frame_queue = queue.Queue() self.allfocusvals = [] self.worker_thread = threading.Thread(target=self.process_frames, daemon=True) self.worker_thread.start() self.flatFieldFrame = None self.nGauss = nGauss self.nCropsize = nCropsize
def setFlatfieldFrame(self, flatfieldFrame):
self.flatFieldFrame = flatfieldFrame
def add_frame(self, img, iz):
self.frame_queue.put((img, iz))
def process_frames(self):
while self.isRunning:
img, iz = self.frame_queue.get()
self.process_frame(img, iz)
def process_frame(self, img, iz):
if self.flatFieldFrame is not None:
img = img / self.flatFieldFrame
img = self.extract(img, self.nCropsize)
if len(img.shape) > 2:
img = np.mean(img, -1)
if 0:
imagearraygf = ndi.gaussian_filter(img, self.nGauss)
is_success, buffer = cv2.imencode(".jpg", imagearraygf, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
focusquality = len(buffer) if is_success else 0
else:
focusquality = self.calculate_focus_measure(img)
self.allfocusvals.append(focusquality)
def calculate_focus_measure(self, image, method="LAPE"):
if len(image.shape) == 3:
image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) # optional
if method == "LAPE":
if image.dtype == np.uint16:
lap = cv2.Laplacian(image, cv2.CV_32F)
else:
lap = cv2.Laplacian(image, cv2.CV_16S)
focus_measure = np.mean(np.square(lap))
elif method == "GLVA":
focus_measure = np.std(image, axis=None) # GLVA
else:
focus_measure = np.std(image, axis=None) # GLVA
return focus_measure
@staticmethod
def extract(marray, crop_size):
center_x, center_y = marray.shape[1] // 2, marray.shape[0] // 2
x_start = center_x - crop_size // 2
x_end = x_start + crop_size
y_start = center_y - crop_size // 2
y_end = y_start + crop_size
return marray[y_start:y_end, x_start:x_end]
def getFocusValueList(self, nFrameExpected, timeout=5):
t0 = time.time()
while len(self.allfocusvals) < nFrameExpected:
time.sleep(0.01)
if time.time() - t0 > timeout:
break
return self.allfocusvals
def stop(self):
self.isRunning = False
### Basic Image Enhancement
```python
# In ImSwitch scripting environment
import numpy as np
import cv2
from imswitch.imcontrol.model import SignalDesigner
class RealTimeProcessor:
def __init__(self):
self.enabled = True
self.contrast_factor = 1.5
self.brightness_offset = 0
def process_frame(self, image):
"""Process incoming camera frames in real-time"""
if not self.enabled:
return image
# Convert to float for processing
img_float = image.astype(np.float32)
# Contrast and brightness adjustment
processed = img_float * self.contrast_factor + self.brightness_offset
# Clip values to valid range
processed = np.clip(processed, 0, 255)
return processed.astype(np.uint8)
def enable_processing(self, enabled=True):
"""Enable/disable real-time processing"""
self.enabled = enabled
# Initialize processor
processor = RealTimeProcessor()
# Connect to ImSwitch camera stream
def on_new_frame(image):
processed_image = processor.process_frame(image)
# Display or save processed image
return processed_image
Advanced Filtering
import scipy.ndimage as ndi
from skimage import filters, morphology
class AdvancedProcessor:
def __init__(self):
self.gaussian_sigma = 1.0
self.threshold_method = 'otsu'
def denoise_image(self, image):
"""Apply denoising filters"""
# Gaussian denoising
denoised = filters.gaussian(image, sigma=self.gaussian_sigma)
# Non-local means denoising (for severe noise)
# denoised = cv2.fastNlMeansDenoising(image.astype(np.uint8))
return denoised
def enhance_contrast(self, image):
"""Enhance image contrast using adaptive histogram equalization"""
# CLAHE (Contrast Limited Adaptive Histogram Equalization)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
if len(image.shape) == 2:
enhanced = clahe.apply(image.astype(np.uint8))
else:
enhanced = np.stack([clahe.apply(image[:,:,i].astype(np.uint8))
for i in range(image.shape[2])], axis=2)
return enhanced
def segment_objects(self, image):
"""Segment objects using threshold and morphological operations"""
# Automatic thresholding
if self.threshold_method == 'otsu':
threshold = filters.threshold_otsu(image)
elif self.threshold_method == 'li':
threshold = filters.threshold_li(image)
binary = image > threshold
# Morphological operations to clean up segmentation
binary = morphology.remove_small_objects(binary, min_size=50)
binary = morphology.remove_small_holes(binary, area_threshold=64)
return binary, threshold
# Usage example
processor = AdvancedProcessor()
def process_microscopy_image(image):
# Step 1: Denoise
denoised = processor.denoise_image(image)
# Step 2: Enhance contrast
enhanced = processor.enhance_contrast(denoised)
# Step 3: Segment objects (if needed)
segmented, threshold = processor.segment_objects(enhanced)
return {
'original': image,
'denoised': denoised,
'enhanced': enhanced,
'segmented': segmented,
'threshold': threshold
}
Napari Integration
Setting Up Napari with ImSwitch
import napari
from imswitch.imcontrol.model import ImSwitchController
class NapariImageProcessor:
def __init__(self):
self.viewer = napari.Viewer()
self.imswitch = ImSwitchController()
def connect_to_imswitch(self):
"""Connect Napari to ImSwitch live stream"""
@self.imswitch.signal.new_image.connect
def update_napari(image):
# Update Napari viewer with new image
if 'live_image' in self.viewer.layers:
self.viewer.layers['live_image'].data = image
else:
self.viewer.add_image(image, name='live_image',
colormap='gray', scale=(1, 1))
def add_processing_layers(self, image_data):
"""Add multiple processing results as layers"""
# Original image
self.viewer.add_image(image_data['original'], name='Original',
colormap='gray')
# Processed versions
self.viewer.add_image(image_data['enhanced'], name='Enhanced',
colormap='viridis', visible=False)
# Segmentation overlay
self.viewer.add_labels(image_data['segmented'].astype(int),
name='Segmentation', opacity=0.5)
def setup_measurement_tools(self):
"""Set up measurement and annotation tools"""
# Add shapes layer for measurements
shapes_layer = self.viewer.add_shapes(name='Measurements')
# Add points layer for marking features
points_layer = self.viewer.add_points(name='Feature Points',
size=10, face_color='red')
return shapes_layer, points_layer
# Initialize Napari processor
napari_processor = NapariImageProcessor()
napari_processor.connect_to_imswitch()
Custom Napari Widgets
from napari.utils.notifications import show_info
from magicgui import magic_factory
import numpy as np
@magic_factory(
sigma={'widget_type': 'FloatSlider', 'min': 0.1, 'max': 5.0, 'step': 0.1},
threshold={'widget_type': 'FloatSlider', 'min': 0, 'max': 1.0, 'step': 0.01}
)
def process_image_widget(viewer: napari.Viewer, sigma: float = 1.0, threshold: float = 0.5):
"""Interactive image processing widget"""
if len(viewer.layers) == 0:
show_info("Please load an image first")
return
# Get current image
image = viewer.layers[0].data
# Apply Gaussian filter
filtered = filters.gaussian(image, sigma=sigma)
# Apply threshold
binary = filtered > (threshold * filtered.max())
# Update or add processed layers
if 'Filtered' in [layer.name for layer in viewer.layers]:
viewer.layers['Filtered'].data = filtered
else:
viewer.add_image(filtered, name='Filtered', colormap='plasma')
if 'Binary' in [layer.name for layer in viewer.layers]:
viewer.layers['Binary'].data = binary
else:
viewer.add_image(binary.astype(float), name='Binary', colormap='red')
# Add widget to Napari
# viewer.window.add_dock_widget(process_image_widget, area='right')
Multi-Channel Processing
Channel-Specific Processing
class MultiChannelProcessor:
def __init__(self):
self.channel_configs = {
'DAPI': {'wavelength': 405, 'exposure': 100, 'gain': 1.0},
'GFP': {'wavelength': 488, 'exposure': 200, 'gain': 1.5},
'RFP': {'wavelength': 561, 'exposure': 150, 'gain': 1.2}
}
def process_channel(self, image, channel_name):
"""Apply channel-specific processing"""
config = self.channel_configs.get(channel_name, {})
# Channel-specific noise reduction
if channel_name == 'DAPI':
# Strong denoising for nuclear staining
processed = cv2.bilateralFilter(image, 9, 75, 75)
elif channel_name in ['GFP', 'RFP']:
# Gentle denoising for fluorescent proteins
processed = filters.gaussian(image, sigma=0.8)
else:
processed = image
return processed
def align_channels(self, channels):
"""Align multiple channels to correct for chromatic aberration"""
reference_channel = channels[0]
aligned_channels = [reference_channel]
for channel in channels[1:]:
# Calculate phase correlation for alignment
shift, error, diffphase = phase_cross_correlation(
reference_channel, channel, upsample_factor=100)
# Apply shift correction
aligned = ndi.shift(channel, shift)
aligned_channels.append(aligned)
return aligned_channels
def create_composite(self, channels, channel_names, colors=None):
"""Create RGB composite from multiple channels"""
if colors is None:
colors = ['blue', 'green', 'red'][:len(channels)]
# Normalize each channel
normalized_channels = []
for channel in channels:
norm_channel = (channel - channel.min()) / (channel.max() - channel.min())
normalized_channels.append(norm_channel)
# Create RGB composite
composite = np.zeros((*channels[0].shape, 3))
color_map = {'blue': 2, 'green': 1, 'red': 0}
for channel, color in zip(normalized_channels, colors):
if color in color_map:
composite[:, :, color_map[color]] = channel
return composite
# Usage example
mc_processor = MultiChannelProcessor()
def process_multichannel_image(image_stack, channel_names):
"""Process a multi-channel image stack"""
# Split channels
channels = [image_stack[:, :, i] for i in range(image_stack.shape[2])]
# Process each channel
processed_channels = []
for channel, name in zip(channels, channel_names):
processed = mc_processor.process_channel(channel, name)
processed_channels.append(processed)
# Align channels
aligned_channels = mc_processor.align_channels(processed_channels)
# Create composite
composite = mc_processor.create_composite(aligned_channels, channel_names)
return {
'original_channels': channels,
'processed_channels': processed_channels,
'aligned_channels': aligned_channels,
'composite': composite
}
Batch Processing Workflows
Automated Image Processing Pipeline
import os
from pathlib import Path
import tifffile
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchProcessor:
def __init__(self, input_dir, output_dir):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def process_single_image(self, image_path):
"""Process a single image file"""
try:
# Load image
image = tifffile.imread(image_path)
# Apply processing pipeline
results = self.processing_pipeline(image)
# Save results
output_path = self.output_dir / f"processed_{image_path.name}"
tifffile.imwrite(output_path, results['enhanced'])
# Save segmentation if available
if 'segmented' in results:
seg_path = self.output_dir / f"segmented_{image_path.name}"
tifffile.imwrite(seg_path, results['segmented'].astype(np.uint8) * 255)
return f"Processed: {image_path.name}"
except Exception as e:
return f"Error processing {image_path.name}: {str(e)}"
def processing_pipeline(self, image):
"""Define your processing pipeline here"""
# Example pipeline
denoised = filters.gaussian(image, sigma=1.0)
enhanced = exposure.equalize_adapthist(denoised)
# Segmentation
threshold = filters.threshold_otsu(enhanced)
segmented = enhanced > threshold
return {
'enhanced': enhanced,
'segmented': segmented
}
def process_batch(self, file_pattern="*.tif", max_workers=4):
"""Process all images in input directory"""
image_files = list(self.input_dir.glob(file_pattern))
if not image_files:
print(f"No images found matching pattern {file_pattern}")
return
print(f"Processing {len(image_files)} images...")
# Process images in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {executor.submit(self.process_single_image, img_file): img_file
for img_file in image_files}
for future in as_completed(future_to_file):
result = future.result()
print(result)
print("Batch processing completed!")
# Usage
batch_processor = BatchProcessor(
input_dir="/path/to/input/images",
output_dir="/path/to/output/images"
)
batch_processor.process_batch(file_pattern="*.tif", max_workers=4)
Quality Control and Metrics
Image Quality Assessment
class ImageQualityAssessment:
def __init__(self):
self.metrics = {}
def calculate_snr(self, image, signal_region=None, noise_region=None):
"""Calculate Signal-to-Noise Ratio"""
if signal_region is None:
# Use center region as signal
h, w = image.shape[:2]
signal_region = image[h//4:3*h//4, w//4:3*w//4]
if noise_region is None:
# Use corner regions as noise
noise_region = np.concatenate([
image[:50, :50].flatten(),
image[-50:, -50:].flatten()
])
signal_mean = np.mean(signal_region)
noise_std = np.std(noise_region)
snr = signal_mean / noise_std if noise_std > 0 else float('inf')
return snr
def calculate_focus_measure(self, image):
"""Calculate focus quality using Laplacian variance"""
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) if len(image.shape) == 3 else image
laplacian = cv2.Laplacian(gray, cv2.CV_64F)
focus_measure = laplacian.var()
return focus_measure
def detect_blur(self, image, threshold=100):
"""Detect if image is blurry"""
focus_measure = self.calculate_focus_measure(image)
return focus_measure < threshold
def calculate_brightness_metrics(self, image):
"""Calculate brightness statistics"""
return {
'mean_intensity': np.mean(image),
'median_intensity': np.median(image),
'std_intensity': np.std(image),
'min_intensity': np.min(image),
'max_intensity': np.max(image)
}
def assess_image_quality(self, image):
"""Comprehensive image quality assessment"""
quality_metrics = {
'snr': self.calculate_snr(image),
'focus_measure': self.calculate_focus_measure(image),
'is_blurry': self.detect_blur(image),
'brightness': self.calculate_brightness_metrics(image)
}
return quality_metrics
# Usage in processing pipeline
qa = ImageQualityAssessment()
def quality_controlled_processing(image):
"""Process image only if quality criteria are met"""
quality = qa.assess_image_quality(image)
# Quality gates
if quality['snr'] < 5:
print("Warning: Low SNR detected")
if quality['is_blurry']:
print("Warning: Image appears blurry")
if quality['brightness']['mean_intensity'] < 10:
print("Warning: Image is very dark")
# Proceed with processing if quality is acceptable
if quality['snr'] > 3 and not quality['is_blurry']:
# Apply processing pipeline
processed = process_microscopy_image(image)
return processed, quality
else:
return None, quality
Custom Processing Plugins
Creating ImSwitch Processing Plugins
from imswitch.imcontrol.model.interfaces import ProcessingInterface
class CustomProcessingPlugin(ProcessingInterface):
def __init__(self):
super().__init__()
self.name = "Custom Image Processor"
self.description = "Custom processing pipeline for specific application"
def process(self, image, parameters=None):
"""Main processing function"""
if parameters is None:
parameters = self.get_default_parameters()
# Implement your custom processing here
processed_image = self.custom_algorithm(image, parameters)
return processed_image
def get_default_parameters(self):
"""Return default processing parameters"""
return {
'noise_reduction': True,
'enhancement_factor': 1.5,
'segmentation': False
}
def custom_algorithm(self, image, params):
"""Implement your custom processing algorithm"""
result = image.copy()
if params['noise_reduction']:
result = filters.gaussian(result, sigma=1.0)
if params['enhancement_factor'] != 1.0:
result = result * params['enhancement_factor']
return result
# Register plugin with ImSwitch
# plugin = CustomProcessingPlugin()
# imswitch.register_processing_plugin(plugin)
Performance Optimization
Optimizing Processing Speed
import numba
from numba import jit
@jit(nopython=True)
def fast_contrast_enhancement(image, factor):
"""JIT-compiled contrast enhancement for speed"""
output = np.empty_like(image)
flat_image = image.flatten()
flat_output = output.flatten()
for i in range(len(flat_image)):
flat_output[i] = min(255, max(0, flat_image[i] * factor))
return output
class OptimizedProcessor:
def __init__(self):
# Pre-compile JIT functions
dummy_image = np.zeros((100, 100), dtype=np.uint8)
fast_contrast_enhancement(dummy_image, 1.0)
def process_large_image(self, image, tile_size=512):
"""Process large images in tiles to manage memory"""
h, w = image.shape[:2]
processed = np.zeros_like(image)
for y in range(0, h, tile_size):
for x in range(0, w, tile_size):
y_end = min(y + tile_size, h)
x_end = min(x + tile_size, w)
tile = image[y:y_end, x:x_end]
processed_tile = self.process_tile(tile)
processed[y:y_end, x:x_end] = processed_tile
return processed
def process_tile(self, tile):
"""Process individual tile"""
return fast_contrast_enhancement(tile, 1.5)
Next Steps
- Jupyter Workflows - Interactive analysis with Jupyter notebooks
- Scripting and Automation - Advanced automation techniques
- Multi-Position Imaging - Automated scanning workflows