import numpy as np from scipy.interpolate import interp1d def replace_nans_with_local_statistic(arr, window_size=3, method='mean', max_iterations=10, use_interpolation=False, max_window_size=None): """ Replace NaN values in a 1D array with the local mean or median of surrounding values, with adaptive window size and optional linear interpolation. Parameters: - arr: Input 1D array with NaN values (list or numpy array). - window_size: Initial window size to compute local statistic (must be odd). - method: 'mean' or 'median' to specify the replacement method. - max_iterations: Number of iterations to try filling NaNs (for consecutive NaNs). - use_interpolation: Whether to use linear interpolation for NaN values (default: False). - max_window_size: The maximum allowable window size when expanding (None means no limit). Returns: - Modified array with NaNs replaced by local statistics or interpolation. """ if method not in ['mean', 'median']: raise ValueError("Method must be 'mean' or 'median'") if window_size % 2 == 0: raise ValueError("Window size must be an odd number") if max_window_size is not None and max_window_size < window_size: raise ValueError("max_window_size must be greater than or equal to window_size") arr = np.array(arr, dtype=float) result = arr.copy() n = len(arr) # Handle the interpolation case if use_interpolation: return interpolate_nans(result) half_window = window_size // 2 for iteration in range(max_iterations): modified = False # Track whether any NaNs were filled in this pass for i in range(n): if np.isnan(result[i]): # Create a local window around the NaN start = max(0, i - half_window) end = min(n, i + half_window + 1) window = result[start:end] # Check if the window has enough valid values to compute a statistic valid_values = window[~np.isnan(window)] # If valid values are found if len(valid_values) > 0: if method == 'mean': result[i] = np.mean(valid_values) else: result[i] = np.median(valid_values) modified = True else: # Expand the window until enough valid values are found or max_window_size is reached if max_window_size is None: window_size = window_size + 2 # Expand window size elif window_size < max_window_size: window_size = window_size + 2 # Expand until max_window_size half_window = window_size // 2 if not modified: # No changes in this iteration → stop break return result def interpolate_nans(arr): """ Interpolates NaN values in the array using linear interpolation. Parameters: - arr: Input 1D array with NaN values (list or numpy array). Returns: - Array with NaN values replaced by linearly interpolated values. """ # Mask to identify NaNs nan_mask = np.isnan(arr) # Create an array of indices where the values are not NaN valid_indices = np.where(~nan_mask)[0] # Create a linear interpolation function interpolator = interp1d(valid_indices, arr[valid_indices], kind='linear', fill_value='extrapolate') # Apply interpolation to NaN indices interpolated_values = interpolator(np.where(nan_mask)[0]) # Replace NaNs with the interpolated values arr[nan_mask] = interpolated_values return arr