Source code for causalpy.data.simulate_data

#   Copyright 2022 - 2026 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
Functions that generate data sets used in examples
"""

import numpy as np
import pandas as pd
from scipy.stats import gamma
from statsmodels.nonparametric.smoothers_lowess import lowess

default_lowess_kwargs: dict[str, float | int] = {"frac": 0.2, "it": 0}
RANDOM_SEED: int = 8927


def _smoothed_gaussian_random_walk(
    gaussian_random_walk_mu: float,
    gaussian_random_walk_sigma: float,
    N: int,
    lowess_kwargs: dict,
    rng: np.random.Generator,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Generates Gaussian random walk data and applies LOWESS.

    :param gaussian_random_walk_mu:
        Mean of the random walk
    :param gaussian_random_walk_sigma:
        Standard deviation of the random walk
    :param N:
        Length of the random walk
    :param lowess_kwargs:
        Keyword argument dictionary passed to statsmodels lowess
    :param rng:
        NumPy random number generator instance
    """
    x = np.arange(N)
    y = rng.normal(gaussian_random_walk_mu, gaussian_random_walk_sigma, N).cumsum()
    filtered = lowess(y, x, **lowess_kwargs)
    y = filtered[:, 1]
    return (x, y)


[docs] def generate_synthetic_control_data( N: int = 100, treatment_time: int = 70, grw_mu: float = 0.25, grw_sigma: float = 1, lowess_kwargs: dict = default_lowess_kwargs, seed: int | None = None, ) -> tuple[pd.DataFrame, np.ndarray]: """ Generates data for synthetic control example. :param N: Number of data points :param treatment_time: Index where treatment begins in the generated dataframe :param grw_mu: Mean of Gaussian Random Walk :param grw_sigma: Standard deviation of Gaussian Random Walk :lowess_kwargs: Keyword argument dictionary passed to statsmodels lowess :param seed: Random seed for reproducibility Example -------- >>> from causalpy.data.simulate_data import generate_synthetic_control_data >>> df, weightings_true = generate_synthetic_control_data( ... treatment_time=70, seed=42 ... ) """ rng = np.random.default_rng(seed) # 1. Generate non-treated variables df = pd.DataFrame( { "a": _smoothed_gaussian_random_walk( grw_mu, grw_sigma, N, lowess_kwargs, rng )[1], "b": _smoothed_gaussian_random_walk( grw_mu, grw_sigma, N, lowess_kwargs, rng )[1], "c": _smoothed_gaussian_random_walk( grw_mu, grw_sigma, N, lowess_kwargs, rng )[1], "d": _smoothed_gaussian_random_walk( grw_mu, grw_sigma, N, lowess_kwargs, rng )[1], "e": _smoothed_gaussian_random_walk( grw_mu, grw_sigma, N, lowess_kwargs, rng )[1], "f": _smoothed_gaussian_random_walk( grw_mu, grw_sigma, N, lowess_kwargs, rng )[1], "g": _smoothed_gaussian_random_walk( grw_mu, grw_sigma, N, lowess_kwargs, rng )[1], } ) # 2. Generate counterfactual, based on weighted sum of non-treated variables. This # is the counterfactual with NO treatment. weightings_true = rng.dirichlet(np.ones(7), size=1) df["counterfactual"] = np.dot(df.to_numpy(), weightings_true.T) # 3. Generate the causal effect causal_effect = gamma(10).pdf(np.arange(0, N, 1) - treatment_time) df["causal effect"] = causal_effect * -50 # 4. Generate the actually observed data, ie the treated with the causal effect # applied df["actual"] = df["counterfactual"] + df["causal effect"] # 5. apply observation noise to all relevant variables for var in ["actual", "a", "b", "c", "d", "e", "f", "g"]: df[var] += rng.normal(0, 0.25, N) return df, weightings_true
[docs] def generate_time_series_data_seasonal( treatment_time: pd.Timestamp, seed: int | None = None, ) -> pd.DataFrame: """ Generates 10 years of monthly data with seasonality :param treatment_time: Timestamp of when treatment begins :param seed: Random seed for reproducibility """ rng = np.random.default_rng(seed) dates = pd.date_range( start=pd.to_datetime("2010-01-01"), end=pd.to_datetime("2020-01-01"), freq="ME" ) df = pd.DataFrame(data={"date": dates}) df = df.assign( year=lambda x: x["date"].dt.year, month=lambda x: x["date"].dt.month, t=df.index, ).set_index("date", drop=True) month_effect = np.array([11, 13, 12, 15, 19, 23, 21, 28, 20, 17, 15, 12]) df["y"] = 0.2 * df["t"] + 2 * month_effect[np.asarray(df.month.values) - 1] N = df.shape[0] idx = np.arange(N)[df.index > treatment_time] df["causal effect"] = 100 * gamma(10).pdf( np.array(np.arange(0, N, 1)) - int(np.min(idx)) ) df["y"] += df["causal effect"] df["y"] += rng.normal(0, 2, N) # add intercept df["intercept"] = np.ones(df.shape[0]) return df
[docs] def generate_time_series_data_simple( treatment_time: pd.Timestamp, slope: float = 0.0, seed: int | None = None, ) -> pd.DataFrame: """Generate simple interrupted time series data, with no seasonality or temporal structure. :param treatment_time: Timestamp of when treatment begins :param slope: Slope of the linear trend :param seed: Random seed for reproducibility """ rng = np.random.default_rng(seed) dates = pd.date_range( start=pd.to_datetime("2010-01-01"), end=pd.to_datetime("2020-01-01"), freq="ME" ) df = pd.DataFrame(data={"date": dates}) df = df.assign( linear_trend=df.index, ).set_index("date", drop=True) df["timeseries"] = slope * df["linear_trend"] N = df.shape[0] df["causal effect"] = (df.index > treatment_time) * 2 df["timeseries"] += df["causal effect"] # add intercept df["intercept"] = np.ones(df.shape[0]) # add observation noise df["timeseries"] += rng.normal(0, 0.25, N) return df
[docs] def generate_did(seed: int | None = None) -> pd.DataFrame: """ Generate Difference in Differences data :param seed: Random seed for reproducibility Example -------- >>> from causalpy.data.simulate_data import generate_did >>> df = generate_did(seed=42) """ rng = np.random.default_rng(seed) # true parameters control_intercept = 1 treat_intercept_delta = 0.25 trend = 1 Δ = 0.5 intervention_time = 0.5 # local functions def outcome( t: np.ndarray, control_intercept: float, treat_intercept_delta: float, trend: float, Δ: float, group: np.ndarray, post_treatment: np.ndarray, ) -> np.ndarray: """Compute the outcome of each unit""" return ( control_intercept + (treat_intercept_delta * group) + (t * trend) + (Δ * post_treatment * group) ) df = pd.DataFrame( { "group": [0, 0, 1, 1] * 10, "t": [0.0, 1.0, 0.0, 1.0] * 10, "unit": np.concatenate([[i] * 2 for i in range(20)]), } ) df["post_treatment"] = df["t"] > intervention_time df["y"] = outcome( np.asarray(df["t"]), control_intercept, treat_intercept_delta, trend, Δ, np.asarray(df["group"]), np.asarray(df["post_treatment"]), ) df["y"] += rng.normal(0, 0.1, df.shape[0]) return df
[docs] def generate_regression_discontinuity_data( N: int = 100, true_causal_impact: float = 0.5, true_treatment_threshold: float = 0.0, seed: int | None = None, ) -> pd.DataFrame: """ Generate regression discontinuity example data :param seed: Random seed for reproducibility Example -------- >>> import pathlib >>> from causalpy.data.simulate_data import generate_regression_discontinuity_data >>> df = generate_regression_discontinuity_data( ... true_treatment_threshold=0.5, seed=42 ... ) """ rng = np.random.default_rng(seed) def is_treated(x: np.ndarray) -> np.ndarray: """Check if x was treated""" return np.greater_equal(x, true_treatment_threshold) def impact(x: np.ndarray) -> np.ndarray: """Assign true_causal_impact to all treated entries""" y = np.zeros(len(x)) y[is_treated(x)] = true_causal_impact return y x = np.sort(rng.uniform(-1, 1, size=N)) y = np.sin(x * 3) + impact(x) + rng.normal(0, 0.1, size=N) return pd.DataFrame({"x": x, "y": y, "treated": is_treated(x)})
[docs] def generate_ancova_data( N: int = 200, pre_treatment_means: np.ndarray | None = None, treatment_effect: int = 2, sigma: int = 1, seed: int | None = None, ) -> pd.DataFrame: """ Generate ANCOVA example data :param seed: Random seed for reproducibility Example -------- >>> import pathlib >>> from causalpy.data.simulate_data import generate_ancova_data >>> df = generate_ancova_data( ... N=200, ... pre_treatment_means=np.array([10, 12]), ... treatment_effect=2, ... sigma=1, ... seed=42, ... ) """ rng = np.random.default_rng(seed) if pre_treatment_means is None: pre_treatment_means = np.array([10, 12]) group = rng.choice(2, size=N) pre = rng.normal(loc=pre_treatment_means[group]) post = pre + treatment_effect * group + rng.normal(size=N) * sigma df = pd.DataFrame({"group": group, "pre": pre, "post": post}) return df
[docs] def generate_geolift_data(seed: int | None = None) -> pd.DataFrame: """Generate synthetic geolift data using a latent factor model. Each unit's time series is a linear combination of K=3 shared seasonal factors (GP draws) with unit-specific loadings, plus observation noise. Most countries share positive loadings and are therefore positively correlated, while 2 "contrarian" countries carry a negative loading on one factor, making them negatively correlated with the majority. The treated unit (Denmark) is a Dirichlet-weighted combination of the positively-loaded countries only, so it is well-reconstructed by good donors but poorly correlated with the contrarian ones. This mirrors the latent factor DGP used to motivate synthetic control methods in Abadie (2010, 2021). """ rng = np.random.default_rng(seed) n_years = 4 treatment_time = pd.to_datetime("2022-01-01") causal_impact = 0.2 time = pd.date_range(start="2019-01-01", periods=52 * n_years, freq="W") n_obs = len(time) K = 3 factors = np.column_stack( [ _create_series( n=52, amplitude=1, length_scale=2, n_years=n_years, intercept=0, rng=rng, ) for _ in range(K) ] ) # (n_obs, K) similar = [ "Austria", "Belgium", "Bulgaria", "Croatia", "Cyprus", "Czech_Republic", "Estonia", "Finland", ] contrarian = ["Greece", "Hungary"] untreated = similar + contrarian # Positive loadings for similar countries, one negative loading for contrarians loadings: dict[str, np.ndarray] = {} for country in similar: loadings[country] = rng.uniform(0.3, 1.0, size=K) loadings["Greece"] = np.array([-0.6, -0.3, 0.8]) loadings["Hungary"] = np.array([0.3, -0.7, -0.5]) df = pd.DataFrame(index=time) df.index.name = "time" for country in untreated: df[country] = factors @ loadings[country] + 3 + rng.normal(0, 0.1, size=n_obs) # Denmark as a weighted sum of similar countries only w = rng.dirichlet(np.ones(len(similar))) df["Denmark"] = df[similar].values @ w + rng.normal(0, 0.1, size=n_obs) # treatment effect df["Denmark"] += np.where(df.index < treatment_time, 0, causal_impact) df = df.clip(lower=0) return df
[docs] def generate_multicell_geolift_data( seed: int | None = None, ) -> pd.DataFrame: """Generate synthetic data for a geolift example. This will consists of 6 untreated countries. The treated unit `Denmark` is a weighted combination of the untreated units. We additionally specify a treatment effect which takes effect after the `treatment_time`. The timeseries data is observed at weekly resolution and has annual seasonality, with this seasonality being a drawn from a Gaussian Process with a periodic kernel. :param seed: Random seed for reproducibility """ rng = np.random.default_rng(seed) n_years = 4 treatment_time = pd.to_datetime("2022-01-01") causal_impact = 0.2 time = pd.date_range(start="2019-01-01", periods=52 * n_years, freq="W") untreated = [ "u1", "u2", "u3", "u4", "u5", "u6", "u7", "u8", "u9", "u10", "u11", "u12", ] df = ( pd.DataFrame( { country: _create_series( n=52, amplitude=1, length_scale=2, n_years=n_years, intercept=3, rng=rng, ) for country in untreated } ) .assign(time=time) .set_index("time") ) treated = ["t1", "t2", "t3", "t4"] for treated_geo in treated: # create treated unit as a weighted sum of the untreated units weights = rng.dirichlet(np.ones(len(untreated))) df[treated_geo] = np.dot(df[untreated].values, weights) # add treatment effect df[treated_geo] += np.where(df.index < treatment_time, 0, causal_impact) # add observation noise to all geos for col in untreated + treated: df[col] += rng.normal(size=len(df), scale=0.1) # ensure we never see any negative sales df = df.clip(lower=0) return df
# ----------------- # UTILITY FUNCTIONS # ----------------- def _generate_seasonality( n: int, amplitude: int, length_scale: float, rng: np.random.Generator, ) -> np.ndarray: """Generate monthly seasonality by sampling from a Gaussian process with a Gaussian kernel, using numpy code :param rng: NumPy random number generator instance """ # Generate the covariance matrix x = np.linspace(0, 1, n) x1, x2 = np.meshgrid(x, x) cov = _periodic_kernel( x1, x2, period=1, length_scale=length_scale, amplitude=amplitude ) # Generate the seasonality return rng.multivariate_normal(np.zeros(n), cov) def _periodic_kernel( x1: np.ndarray, x2: np.ndarray, period: int = 1, length_scale: float = 1.0, amplitude: int = 1, ) -> np.ndarray: """Generate a periodic kernel for gaussian process""" return amplitude**2 * np.exp( -2 * np.sin(np.pi * np.abs(x1 - x2) / period) ** 2 / length_scale**2 ) def _create_series( n: int, amplitude: int, length_scale: int, n_years: int, intercept: int, rng: np.random.Generator, ) -> np.ndarray: """ Returns numpy tile with generated seasonality data repeated over multiple years :param rng: NumPy random number generator instance """ return np.tile( _generate_seasonality( n=n, amplitude=amplitude, length_scale=length_scale, rng=rng ) + intercept, n_years, )
[docs] def generate_staggered_did_data( n_units: int = 50, n_time_periods: int = 20, treatment_cohorts: dict[int, int] | None = None, treatment_effects: dict[int, float] | None = None, unit_fe_scale: float = 2.0, time_fe_scale: float = 1.0, sigma: float = 0.5, seed: int | None = None, ) -> pd.DataFrame: """ Generate synthetic panel data with staggered treatment adoption. Creates a balanced panel dataset where different cohorts of units receive treatment at different times. Supports dynamic treatment effects that vary by event-time (time relative to treatment). Parameters ---------- n_units : int, default=50 Total number of units in the panel. n_time_periods : int, default=20 Number of time periods in the panel. treatment_cohorts : dict[int, int], optional Dictionary mapping treatment time to number of units in that cohort. Units not assigned to any cohort are never-treated. Default: {5: 10, 10: 10, 15: 10} (3 cohorts of 10 units each, leaving 20 never-treated units). treatment_effects : dict[int, float], optional Dictionary mapping event-time (t - G) to treatment effect. Event-time 0 is the first treated period. Default: {0: 1.0, 1: 1.5, 2: 2.0, 3: 2.5} with constant effect of 2.5 for all subsequent periods. unit_fe_scale : float, default=2.0 Scale of unit fixed effects (drawn from Normal(0, unit_fe_scale)). time_fe_scale : float, default=1.0 Scale of time fixed effects (drawn from Normal(0, time_fe_scale)). sigma : float, default=0.5 Standard deviation of idiosyncratic noise. seed : int, optional Random seed for reproducibility. Returns ------- pd.DataFrame Panel data with columns: - unit: Unit identifier - time: Time period - treated: Binary indicator (1 if treated at time t, 0 otherwise) - treatment_time: Time of treatment adoption (np.inf for never-treated) - y: Observed outcome - y0: Counterfactual outcome (for validation) - tau: True treatment effect (for validation) Examples -------- >>> from causalpy.data.simulate_data import generate_staggered_did_data >>> df = generate_staggered_did_data(n_units=30, n_time_periods=15, seed=42) >>> df.head() unit time treated treatment_time ... Notes ----- The data generating process is: .. math:: Y_{it} = \\alpha_i + \\lambda_t + \\tau_{it} \\cdot D_{it} + \\varepsilon_{it} where :math:`\\alpha_i` is the unit fixed effect, :math:`\\lambda_t` is the time fixed effect, :math:`D_{it}` is the treatment indicator, and :math:`\\tau_{it}` is the dynamic treatment effect that depends on event-time :math:`e = t - G_i`. """ rng = np.random.default_rng(seed) # Default treatment cohorts: 3 cohorts at times 5, 10, 15 if treatment_cohorts is None: treatment_cohorts = {5: 10, 10: 10, 15: 10} # Default dynamic treatment effects: ramp up then stabilize if treatment_effects is None: treatment_effects = {0: 1.0, 1: 1.5, 2: 2.0, 3: 2.5} # Validate cohort assignments don't exceed n_units total_treated = sum(treatment_cohorts.values()) if total_treated > n_units: raise ValueError( f"Total units in treatment cohorts ({total_treated}) " f"exceeds n_units ({n_units})" ) # Generate unit fixed effects unit_fe = rng.normal(0, unit_fe_scale, n_units) # Generate time fixed effects time_fe = rng.normal(0, time_fe_scale, n_time_periods) # Assign treatment times to units treatment_times = np.full(n_units, np.inf) # Default: never treated unit_idx = 0 for g, n_cohort in treatment_cohorts.items(): treatment_times[unit_idx : unit_idx + n_cohort] = g unit_idx += n_cohort # Shuffle treatment assignments rng.shuffle(treatment_times) # Build panel data rows = [] for i in range(n_units): for t in range(n_time_periods): g_i = treatment_times[i] is_treated = t >= g_i # Counterfactual outcome (no treatment) y0 = unit_fe[i] + time_fe[t] # Treatment effect based on event-time if is_treated: event_time = int(t - g_i) # Use specified effect or last available effect for later periods if event_time in treatment_effects: tau = treatment_effects[event_time] else: # Use the effect for the maximum specified event-time max_event_time = max(treatment_effects.keys()) tau = treatment_effects[max_event_time] else: tau = 0.0 # Add noise epsilon = rng.normal(0, sigma) # Observed outcome y = y0 + tau + epsilon rows.append( { "unit": i, "time": t, "treated": int(is_treated), "treatment_time": g_i, "y": y, "y0": y0, "tau": tau, } ) df = pd.DataFrame(rows) return df
[docs] def generate_piecewise_its_data( N: int = 100, interruption_times: list[int] | None = None, baseline_intercept: float = 10.0, baseline_slope: float = 0.1, level_changes: list[float] | None = None, slope_changes: list[float] | None = None, noise_sigma: float = 1.0, seed: int | None = None, ) -> tuple[pd.DataFrame, dict]: """ Generate piecewise Interrupted Time Series data with known ground truth parameters. This function creates synthetic data for testing and demonstrating piecewise ITS / segmented regression models. The data follows the model: y_t = β₀ + β₁t + Σₖ(level_k · I_k(t) + slope_k · R_k(t)) + ε_t Where: - I_k(t) = 1 if t >= T_k else 0 (step function for level change) - R_k(t) = max(0, t - T_k) (ramp function for slope change) Parameters ---------- N : int, default=100 Number of time points in the series. interruption_times : list[int], optional List of time indices where interruptions occur. Defaults to [50]. baseline_intercept : float, default=10.0 The intercept (β₀) of the baseline trend. baseline_slope : float, default=0.1 The slope (β₁) of the baseline trend. level_changes : list[float], optional List of level changes at each interruption. Length must match interruption_times. If None, defaults to [5.0] for single interruption. slope_changes : list[float], optional List of slope changes at each interruption. Length must match interruption_times. If None, defaults to [0.0] (no slope change). noise_sigma : float, default=1.0 Standard deviation of the Gaussian noise. seed : int, optional Random seed for reproducibility. Returns ------- df : pd.DataFrame DataFrame with columns: - 't': time index (0 to N-1) - 'y': observed outcome with noise - 'y_true': outcome without noise (ground truth) - 'counterfactual': baseline trend without intervention effects - 'effect': true causal effect at each time point params : dict Dictionary containing the true parameters: - 'baseline_intercept': β₀ - 'baseline_slope': β₁ - 'level_changes': list of level changes - 'slope_changes': list of slope changes - 'interruption_times': list of interruption times - 'noise_sigma': noise standard deviation Examples -------- >>> from causalpy.data.simulate_data import generate_piecewise_its_data >>> # Single interruption with level and slope change >>> df, params = generate_piecewise_its_data( ... N=100, ... interruption_times=[50], ... level_changes=[5.0], ... slope_changes=[0.2], ... seed=42, ... ) >>> df.shape (100, 5) >>> # Multiple interruptions >>> df, params = generate_piecewise_its_data( ... N=150, ... interruption_times=[50, 100], ... level_changes=[3.0, -2.0], ... slope_changes=[0.1, -0.15], ... seed=42, ... ) >>> len(params["interruption_times"]) 2 >>> # Level change only (no slope change) >>> df, params = generate_piecewise_its_data( ... N=100, ... interruption_times=[50], ... level_changes=[5.0], ... slope_changes=[0.0], ... seed=42, ... ) """ # Set defaults if interruption_times is None: interruption_times = [50] n_interruptions = len(interruption_times) if level_changes is None: level_changes = [5.0] * n_interruptions if slope_changes is None: slope_changes = [0.0] * n_interruptions # Validate inputs if len(level_changes) != n_interruptions: raise ValueError( f"level_changes length ({len(level_changes)}) must match " f"interruption_times length ({n_interruptions})" ) if len(slope_changes) != n_interruptions: raise ValueError( f"slope_changes length ({len(slope_changes)}) must match " f"interruption_times length ({n_interruptions})" ) for t_k in interruption_times: if t_k < 0 or t_k >= N: raise ValueError( f"Interruption time {t_k} is outside valid range [0, {N - 1}]" ) rng = np.random.default_rng(seed) # Generate time index t = np.arange(N) # Compute baseline (counterfactual) counterfactual = baseline_intercept + baseline_slope * t # Compute intervention effects effect = np.zeros(N) for k, t_k in enumerate(interruption_times): # Step function: I_k(t) = 1 if t >= t_k step = (t >= t_k).astype(float) # Ramp function: R_k(t) = max(0, t - t_k) ramp = np.maximum(0, t - t_k).astype(float) effect += level_changes[k] * step + slope_changes[k] * ramp # Compute true outcome (without noise) y_true = counterfactual + effect # Add noise noise = rng.normal(0, noise_sigma, N) y = y_true + noise # Create DataFrame df = pd.DataFrame( { "t": t, "y": y, "y_true": y_true, "counterfactual": counterfactual, "effect": effect, } ) # Store parameters params = { "baseline_intercept": baseline_intercept, "baseline_slope": baseline_slope, "level_changes": level_changes, "slope_changes": slope_changes, "interruption_times": interruption_times, "noise_sigma": noise_sigma, } return df, params