I am trying to create examples to compare and contrast Bayesian MCMC (e.g. HMC) with non-Bayesian equivalents. One of the cases I am finding difficult is creating a mixture of gamma distributions.
I first had some provisional success with a mixture of two distributions:
import numpy as np
from scipy.stats import gamma, rv_continuous
import matplotlib.pyplot as plt
from scipy.optimize import minimize
class gamma_mixture(rv_continuous):
def _pdf(self, x, w, a1, scale1, a2, scale2):
return w * gamma.pdf(x, a1, scale=scale1) + (1 - w) * gamma.pdf(x, a2, scale=scale2)
def fit(self, data):
def log_likelihood(params):
w, a1, scale1, a2, scale2 = params
mixture = w * gamma.pdf(data, a1, scale=scale1) + (1 - w) * gamma.pdf(data, a2, scale=scale2)
return -np.sum(np.log(mixture))
initial_params = [0.8, 2.0, 2.0, 10.0, 1.0]
bounds = [(0, 1), (0, None), (0, None), (0, None), (0, None)]
result = minimize(log_likelihood, initial_params, bounds=bounds, method='L-BFGS-B')
if result.success:
self.fitted_params = result.x
else:
raise RuntimeError("Optimization failed")
# Generate sample data
np.random.seed(2018)
data = np.concatenate([
gamma.rvs(a=2.0, scale=2.0, size=100),
gamma.rvs(a=20.0, scale=1.0, size=100)
])
# Define and fit the gamma mixture model to the data
custom_gamma_mixture = gamma_mixture(name='gamma_mixture')
custom_gamma_mixture.fit(data)
w, a1, scale1, a2, scale2 = custom_gamma_mixture.fitted_params
# Evaluate the PDF of the fitted mixture model
x = np.linspace(data.min(), data.max(), 1000)
pdf_vals = custom_gamma_mixture.pdf(x, w, a1, scale1, a2, scale2)
# Plot the fitted PDF against the histogram of the data
fig, axes = plt.subplots(2, sharex=True)
axes[0].hist(data, bins=30, density=True, alpha=0.6, color='g', label='Data Histogram')
axes[0].plot(x, pdf_vals, 'r-', lw=2, label='Fitted Mixture PDF')
axes[0].set_title('Original Sample')
axes[1].hist(custom_gamma_mixture(*custom_gamma_mixture.fitted_params).rvs(size=200), bins=30, density=True, alpha=0.6, color='b', label='Data Histogram')
axes[1].plot(x, pdf_vals, 'r-', lw=2, label='Fitted Mixture PDF')
axes[1].set_title('New Sample')
plt.tight_layout()
plt.show()
# Output fitted parameters
print("Fitted Parameters:")
print(f"w: {w:.4f}")
print(f"a1: {a1:.4f}, scale1: {scale1:.4f}")
print(f"a2: {a2:.4f}, scale2: {scale2:.4f}")
I then tried to generalize to multiple distributions and found that either I got failures to converge or the plotted distribution just didn’t look right. Here is an example:
import numpy as np
from scipy.stats import gamma
from scipy.optimize import minimize
from typing import Tuple
class GammaMixture:
def __init__(self, n_components: int):
self.n_components = n_components
self.weights = np.ones(n_components) / n_components
self.alphas = np.ones(n_components)
self.scales = np.ones(n_components)
def _pdf(self, x: np.ndarray) -> np.ndarray:
mixture = np.sum(self.weights[i] * gamma.pdf(x, self.alphas[i], scale=self.scales[i]) for i in range(self.n_components))
return mixture
def _negative_log_likelihood(self, params: np.ndarray, data: np.ndarray) -> float:
self.weights, self.alphas, self.scales = np.split(params, [self.n_components, 2*self.n_components])
self.weights = np.exp(self.weights) / np.sum(np.exp(self.weights)) # Ensure probabilities sum to 1
neg_log_likelihood = -np.sum(np.log(self._pdf(data)))
return neg_log_likelihood
def fit(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
initial_params = np.concatenate([np.zeros(self.n_components), np.ones(2*self.n_components)])
bounds = [(0, None)] * self.n_components + [(0, None)] * (2*self.n_components)
result = minimize(self._negative_log_likelihood, initial_params, args=(data,), bounds=bounds)
if result.success:
self.weights, self.alphas, self.scales = np.split(result.x, [self.n_components, 2*self.n_components])
self.weights = np.exp(self.weights) / np.sum(np.exp(self.weights)) # Ensure probabilities sum to 1
return self.weights, self.alphas, self.scales
else:
raise RuntimeError("Optimization failed")
def sample(self, n_samples: int) -> np.ndarray:
components = np.random.choice(self.n_components, size=n_samples, p=self.weights)
samples = np.array([gamma.rvs(self.alphas[i], scale=self.scales[i]) for i in components])
return samples
# Example usage:
np.random.seed(0)
data = np.concatenate([
gamma.rvs(a=2.0, scale=2.0, size=100),
gamma.rvs(a=20.0, scale=1.0, size=100)
])
n_components = 3
gamma_mixture = GammaMixture(n_components)
weights, alphas, scales = gamma_mixture.fit(data)
print("Fitted Parameters:")
print("Weights:", weights)
print("Alphas:", alphas)
print("Scales:", scales)
# Generate samples from the fitted model
samples = gamma_mixture.sample(n_samples=1000)
import matplotlib.pyplot as plt
# Plot histograms
plt.figure(figsize=(10, 5))
# Histogram of original data
plt.subplot(1, 2, 1)
plt.hist(data, bins=30, density=True, color='blue', alpha=0.6, label='Original Data')
plt.title('Histogram of Original Data')
plt.xlabel('Value')
plt.ylabel('Density')
plt.legend()
# Histogram of new samples
plt.subplot(1, 2, 2)
plt.hist(samples, bins=30, density=True, color='orange', alpha=0.6, label='New Samples')
plt.title('Histogram of New Samples')
plt.xlabel('Value')
plt.ylabel('Density')
plt.legend()
plt.tight_layout()
plt.show()
I had wondered if maybe I should use expectation maximization, so I tried that also using KMeans clustering to give the process a warm start:
import numpy as np
from scipy.stats import gamma
from sklearn.cluster import KMeans
class GammaMixture:
def __init__(self, n_components: int):
"""
Initialize the Gamma Mixture Model.
Args:
n_components (int): Number of gamma distributions (components) in the mixture.
"""
self.n_components = n_components
self.weights = np.ones(n_components) / n_components
self.alphas = np.ones(n_components)
self.scales = np.ones(n_components)
self.fitted = False
def _e_step(self, data: np.ndarray) -> np.ndarray:
"""
E-step: Calculate the responsibilities.
Args:
data (np.ndarray): Observed data.
Returns:
np.ndarray: Responsibilities of each component for each data point.
"""
responsibilities = np.zeros((data.shape[0], self.n_components))
for i in range(self.n_components):
responsibilities[:, i] = self.weights[i] * gamma.pdf(data, a=self.alphas[i], scale=self.scales[i])
sum_responsibilities = np.sum(responsibilities, axis=1).reshape(-1, 1)
if np.any(sum_responsibilities == 0):
raise ValueError("Some data points have zero responsibilities.")
responsibilities /= sum_responsibilities
return responsibilities
def _m_step(self, data: np.ndarray, responsibilities: np.ndarray):
"""
M-step: Update the parameters of the gamma distributions and the weights.
Args:
data (np.ndarray): Observed data.
responsibilities (np.ndarray): Responsibilities of each component for each data point.
"""
total_resp = np.sum(responsibilities, axis=0)
self.weights = total_resp / data.shape[0]
for i in range(self.n_components):
resp = responsibilities[:, i]
weighted_data_sum = np.sum(resp * data)
weighted_log_data_sum = np.sum(resp * np.log(data))
if total_resp[i] == 0 or weighted_data_sum == 0 or weighted_log_data_sum == 0:
raise ValueError(f"Invalid weighted sums for component {i}: total_resp={total_resp[i]}, weighted_data_sum={weighted_data_sum}, weighted_log_data_sum={weighted_log_data_sum}")
self.alphas[i] = total_resp[i] / (np.sum(resp * np.log(data)) - np.sum(resp) * np.log(weighted_data_sum / total_resp[i]))
self.scales[i] = weighted_data_sum / (total_resp[i] * self.alphas[i])
if np.isnan(self.alphas[i]) or np.isnan(self.scales[i]):
raise ValueError(f"NaN encountered in alphas or scales during M-step for component {i}.")
print(f"Component {i}: alpha={self.alphas[i]}, scale={self.scales[i]}, weight={self.weights[i]}")
def _warm_start(self, data: np.ndarray):
"""
Warm start the parameters using K-means clustering.
Args:
data (np.ndarray): Observed data.
"""
kmeans = KMeans(n_clusters=self.n_components, random_state=0)
labels = kmeans.fit_predict(data.reshape(-1, 1))
for i in range(self.n_components):
cluster_data = data[labels == i]
if len(cluster_data) == 0:
continue
data_mean = np.mean(cluster_data)
data_var = np.var(cluster_data)
self.alphas[i] = data_mean ** 2 / data_var
self.scales[i] = data_var / data_mean
self.weights[i] = len(cluster_data) / len(data)
print(f"Warm start Component {i}: alpha={self.alphas[i]}, scale={self.scales[i]}, weight={self.weights[i]}")
def fit(self, data: np.ndarray, tol: float = 1e-6, max_iter: int = 100):
"""
Fit the Gamma Mixture Model to the data.
Args:
data (np.ndarray): Observed data.
tol (float): Tolerance for convergence.
max_iter (int): Maximum number of iterations.
Raises:
RuntimeError: If the optimization fails to converge.
"""
self._warm_start(data)
log_likelihood_prev = -np.inf
for iteration in range(max_iter):
responsibilities = self._e_step(data)
self._m_step(data, responsibilities)
log_likelihood = np.sum(np.log(np.sum([w * gamma.pdf(data, a, scale=s) for w, a, s in zip(self.weights, self.alphas, self.scales)], axis=0)))
print(f"Iteration {iteration}: log_likelihood={log_likelihood}")
if np.abs(log_likelihood - log_likelihood_prev) < tol:
break
log_likelihood_prev = log_likelihood
if np.any(np.isnan(self.weights)) or np.any(np.isnan(self.alphas)) or np.any(np.isnan(self.scales)):
raise ValueError("NaN encountered in parameters after fitting.")
self.fitted = True
def sample(self, n_samples: int) -> np.ndarray:
"""
Sample from the fitted Gamma Mixture Model.
Args:
n_samples (int): Number of samples to generate.
Returns:
np.ndarray: Samples generated from the model.
Raises:
RuntimeError: If the model has not been fitted yet.
"""
if not self.fitted:
raise RuntimeError("Model has not been fitted yet. Fit the model first.")
samples = np.zeros(n_samples)
component_samples = np.random.choice(self.n_components, size=n_samples, p=self.weights)
for i in range(self.n_components):
n_component_samples = np.sum(component_samples == i)
if n_component_samples > 0:
samples[component_samples == i] = gamma.rvs(a=self.alphas[i], scale=self.scales[i], size=n_component_samples)
return samples
# Example usage
np.random.seed(0)
data = np.concatenate([
gamma.rvs(a=2, scale=2, size=300),
gamma.rvs(a=5, scale=1, size=300),
gamma.rvs(a=9, scale=0.5, size=400)
])
gamma_mixture = GammaMixture(n_components=3)
gamma_mixture.fit(data)
samples = gamma_mixture.sample(n_samples=1000)
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(8, 6))
sns.histplot(data, color='blue', kde=True, label='Observed', stat='density')
sns.histplot(samples, color='red', kde=True, label='Sampled', stat='density')
plt.title('Distribution of Observed vs Sampled Data')
plt.xlabel('Value')
plt.ylabel('Density')
plt.legend()
plt.show()
But that also gave either convergence issues or visually poor agreement with the data. Lastly, I tried a warm start that used method of moments:
import numpy as np
from scipy.stats import gamma
class GammaMixture:
def __init__(self, n_components: int):
"""
Initialize the Gamma Mixture Model.
Args:
n_components (int): Number of gamma distributions (components) in the mixture.
"""
self.n_components = n_components
self.weights = np.ones(n_components) / n_components
self.alphas = np.ones(n_components)
self.scales = np.ones(n_components)
self.fitted = False
def _e_step(self, data: np.ndarray) -> np.ndarray:
"""
E-step: Calculate the responsibilities.
Args:
data (np.ndarray): Observed data.
Returns:
np.ndarray: Responsibilities of each component for each data point.
"""
responsibilities = np.zeros((data.shape[0], self.n_components))
for i in range(self.n_components):
responsibilities[:, i] = self.weights[i] * gamma.pdf(data, a=self.alphas[i], scale=self.scales[i])
sum_responsibilities = np.sum(responsibilities, axis=1).reshape(-1, 1)
if np.any(sum_responsibilities == 0):
raise ValueError("Some data points have zero responsibilities.")
responsibilities /= sum_responsibilities
return responsibilities
def _m_step(self, data: np.ndarray, responsibilities: np.ndarray):
"""
M-step: Update the parameters of the gamma distributions and the weights.
Args:
data (np.ndarray): Observed data.
responsibilities (np.ndarray): Responsibilities of each component for each data point.
"""
total_resp = np.sum(responsibilities, axis=0)
self.weights = total_resp / data.shape[0]
for i in range(self.n_components):
resp = responsibilities[:, i]
weighted_data_sum = np.sum(resp * data)
weighted_log_data_sum = np.sum(resp * np.log(data))
if total_resp[i] == 0 or weighted_data_sum == 0 or weighted_log_data_sum == 0:
raise ValueError(f"Invalid weighted sums for component {i}: total_resp={total_resp[i]}, weighted_data_sum={weighted_data_sum}, weighted_log_data_sum={weighted_log_data_sum}")
self.alphas[i] = (total_resp[i] / weighted_log_data_sum)
self.scales[i] = (weighted_data_sum / total_resp[i]) / self.alphas[i]
if np.isnan(self.alphas[i]) or np.isnan(self.scales[i]):
raise ValueError(f"NaN encountered in alphas or scales during M-step for component {i}.")
print(f"Component {i}: alpha={self.alphas[i]}, scale={self.scales[i]}, weight={self.weights[i]}")
def _warm_start(self, data: np.ndarray):
"""
Warm start the parameters using Method of Moments.
Args:
data (np.ndarray): Observed data.
"""
data_mean = np.mean(data)
data_var = np.var(data)
for i in range(self.n_components):
self.alphas[i] = data_mean ** 2 / data_var
self.scales[i] = data_var / data_mean
self.weights[i] = 1 / self.n_components
print(f"Warm start Component {i}: alpha={self.alphas[i]}, scale={self.scales[i]}, weight={self.weights[i]}")
def fit(self, data: np.ndarray, tol: float = 1e-6, max_iter: int = 100):
"""
Fit the Gamma Mixture Model to the data.
Args:
data (np.ndarray): Observed data.
tol (float): Tolerance for convergence.
max_iter (int): Maximum number of iterations.
Raises:
RuntimeError: If the optimization fails to converge.
"""
self._warm_start(data)
log_likelihood_prev = -np.inf
for iteration in range(max_iter):
responsibilities = self._e_step(data)
self._m_step(data, responsibilities)
log_likelihood = np.sum(np.log(np.sum([w * gamma.pdf(data, a, scale=s) for w, a, s in zip(self.weights, self.alphas, self.scales)], axis=0)))
print(f"Iteration {iteration}: log_likelihood={log_likelihood}")
if np.abs(log_likelihood - log_likelihood_prev) < tol:
break
log_likelihood_prev = log_likelihood
if np.any(np.isnan(self.weights)) or np.any(np.isnan(self.alphas)) or np.any(np.isnan(self.scales)):
raise ValueError("NaN encountered in parameters after fitting.")
self.fitted = True
def sample(self, n_samples: int) -> np.ndarray:
"""
Sample from the fitted Gamma Mixture Model.
Args:
n_samples (int): Number of samples to generate.
Returns:
np.ndarray: Samples generated from the model.
Raises:
RuntimeError: If the model has not been fitted yet.
"""
if not self.fitted:
raise RuntimeError("Model has not been fitted yet. Fit the model first.")
samples = np.zeros(n_samples)
component_samples = np.random.choice(self.n_components, size=n_samples, p=self.weights)
for i in range(self.n_components):
n_component_samples = np.sum(component_samples == i)
if n_component_samples > 0:
samples[component_samples == i] = gamma.rvs(a=self.alphas[i], scale=self.scales[i], size=n_component_samples)
return samples
# Example usage
np.random.seed(0)
data = np.concatenate([
gamma.rvs(a=2, scale=2, size=300),
gamma.rvs(a=5, scale=1, size=300),
gamma.rvs(a=9, scale=0.5, size=400)
])
gamma_mixture = GammaMixture(n_components=3)
gamma_mixture.fit(data)
samples = gamma_mixture.sample(n_samples=1000)
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(8, 6))
sns.histplot(data, color='blue', kde=True, label='Observed', stat='density')
sns.histplot(samples, color='red', kde=True, label='Sampled', stat='density')
plt.title('Distribution of Observed vs Sampled Data')
plt.xlabel('Value')
plt.ylabel('Density')
plt.legend()
plt.show()
How should I actually go about implementing a mixture of gamma distributions without using Bayes’?