Skip to content

Add GATE and CATE for IRM models #169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ share/python-wheels/
.installed.cfg
*.egg
MANIFEST
*.idea
4 changes: 3 additions & 1 deletion doubleml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from .double_ml_irm import DoubleMLIRM
from .double_ml_iivm import DoubleMLIIVM
from .double_ml_data import DoubleMLData, DoubleMLClusterData
from .double_ml_blp import DoubleMLBLP

__all__ = ['DoubleMLPLR',
'DoubleMLPLIV',
'DoubleMLIRM',
'DoubleMLIIVM',
'DoubleMLData',
'DoubleMLClusterData']
'DoubleMLClusterData',
'DoubleMLBLP']

__version__ = get_distribution('doubleml').version
224 changes: 224 additions & 0 deletions doubleml/double_ml_blp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import statsmodels.api as sm
import numpy as np
import pandas as pd

from scipy.stats import norm
from scipy.linalg import sqrtm


class DoubleMLBLP:
"""Best linear predictor (BLP) for DoubleML with orthogonal signals.

Parameters
----------
orth_signal : :class:`numpy.array`
The orthogonal signal to be predicted. Has to be of shape ``(n_obs,)``,
where ``n_obs`` is the number of observations.

basis : :class:`pandas.DataFrame`
The basis for estimating the best linear predictor. Has to have the shape ``(n_obs, d)``,
where ``n_obs`` is the number of observations and ``d`` is the number of predictors.

is_gate : bool
Indicates whether the basis is constructed for GATEs (dummy-basis).
Default is ``False``.
"""

def __init__(self,
orth_signal,
basis,
is_gate=False):

if not isinstance(orth_signal, np.ndarray):
raise TypeError('The signal must be of np.ndarray type. '
f'Signal of type {str(type(orth_signal))} was passed.')

if orth_signal.ndim != 1:
raise ValueError('The signal must be of one dimensional. '
f'Signal of dimensions {str(orth_signal.ndim)} was passed.')

if not isinstance(basis, pd.DataFrame):
raise TypeError('The basis must be of DataFrame type. '
f'Basis of type {str(type(basis))} was passed.')

if not basis.columns.is_unique:
raise ValueError('Invalid pd.DataFrame: '
'Contains duplicate column names.')

self._orth_signal = orth_signal
self._basis = basis
self._is_gate = is_gate

# initialize the score and the covariance
self._blp_model = None
self._blp_omega = None

def __str__(self):
class_name = self.__class__.__name__
header = f'================== {class_name} Object ==================\n'
fit_summary = str(self.summary)
res = header + \
'\n------------------ Fit summary ------------------\n' + fit_summary
return res

@property
def blp_model(self):
"""
Best-Linear-Predictor model.
"""
return self._blp_model

@property
def orth_signal(self):
"""
Orthogonal signal.
"""
return self._orth_signal

@property
def basis(self):
"""
Basis.
"""
return self._basis

@property
def blp_omega(self):
"""
Covariance matrix.
"""
return self._blp_omega

@property
def summary(self):
"""
A summary for the best linear predictor effect after calling :meth:`fit`.
"""
col_names = ['coef', 'std err', 't', 'P>|t|', '[0.025', '0.975]']
if self.blp_model is None:
df_summary = pd.DataFrame(columns=col_names)
else:
summary_stats = {'coef': self.blp_model.params,
'std err': self.blp_model.bse,
't': self.blp_model.tvalues,
'P>|t|': self.blp_model.pvalues,
'[0.025': self.blp_model.conf_int()[0],
'0.975]': self.blp_model.conf_int()[1]}
df_summary = pd.DataFrame(summary_stats,
columns=col_names)
return df_summary

def fit(self):
"""
Estimate DoubleML models.

Returns
-------
self : object
"""

# fit the best-linear-predictor of the orthogonal signal with respect to the grid
self._blp_model = sm.OLS(self._orth_signal, self._basis).fit()
self._blp_omega = self._blp_model.cov_HC0

return self

def confint(self, basis=None, joint=False, level=0.95, n_rep_boot=500):
"""
Confidence intervals for the BLP model.

Parameters
----------
basis : :class:`pandas.DataFrame`
The basis for constructing the confidence interval. Has to have the same form as the basis from
the construction. If ``None`` the basis for the construction of the model is used.
Default is ``None``

joint : bool
Indicates whether joint confidence intervals are computed.
Default is ``False``

level : float
The confidence level.
Default is ``0.95``.

n_rep_boot : int
The number of bootstrap repetitions (only relevant for joint confidence intervals).
Default is ``500``.

Returns
-------
df_ci : pd.DataFrame
A data frame with the confidence interval(s).
"""
if not isinstance(joint, bool):
raise TypeError('joint must be True or False. '
f'Got {str(joint)}.')

if not isinstance(level, float):
raise TypeError('The confidence level must be of float type. '
f'{str(level)} of type {str(type(level))} was passed.')
if (level <= 0) | (level >= 1):
raise ValueError('The confidence level must be in (0,1). '
f'{str(level)} was passed.')

if not isinstance(n_rep_boot, int):
raise TypeError('The number of bootstrap replications must be of int type. '
f'{str(n_rep_boot)} of type {str(type(n_rep_boot))} was passed.')
if n_rep_boot < 1:
raise ValueError('The number of bootstrap replications must be positive. '
f'{str(n_rep_boot)} was passed.')

if self._blp_model is None:
raise ValueError('Apply fit() before confint().')

alpha = 1 - level
gate_names = None
# define basis if none is supplied
if basis is None:
if self._is_gate:
# reduce to unique groups
basis = pd.DataFrame(np.diag(v=np.full((self._basis.shape[1]), True)))
gate_names = list(self._basis.columns.values)
else:
basis = self._basis
elif not (basis.shape[1] == self._basis.shape[1]):
raise ValueError('Invalid basis: DataFrame has to have the exact same number and ordering of columns.')
elif not list(basis.columns.values) == list(self._basis.columns.values):
raise ValueError('Invalid basis: DataFrame has to have the exact same number and ordering of columns.')

# blp of the orthogonal signal
g_hat = self._blp_model.predict(basis)

np_basis = basis.to_numpy()
# calculate se for basis elements
blp_se = np.sqrt((np.dot(np_basis, self._blp_omega) * np_basis).sum(axis=1))

if joint:
# calculate the maximum t-statistic with bootstrap
normal_samples = np.random.normal(size=[basis.shape[1], n_rep_boot])
bootstrap_samples = np.multiply(np.dot(np_basis, np.dot(sqrtm(self._blp_omega), normal_samples)).T,
(1.0 / blp_se))

max_t_stat = np.quantile(np.max(np.abs(bootstrap_samples), axis=0), q=level)

# Lower simultaneous CI
g_hat_lower = g_hat - max_t_stat * blp_se
# Upper simultaneous CI
g_hat_upper = g_hat + max_t_stat * blp_se

else:
# Lower point-wise CI
g_hat_lower = g_hat + norm.ppf(q=alpha / 2) * blp_se
# Upper point-wise CI
g_hat_upper = g_hat + norm.ppf(q=1 - alpha / 2) * blp_se

ci = np.vstack((g_hat_lower, g_hat, g_hat_upper)).T
df_ci = pd.DataFrame(ci,
columns=['{:.1f} %'.format(alpha/2 * 100), 'effect', '{:.1f} %'.format((1-alpha/2) * 100)],
index=basis.index)

if self._is_gate and gate_names is not None:
df_ci.index = gate_names

return df_ci
7 changes: 3 additions & 4 deletions doubleml/double_ml_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,10 +725,9 @@ def from_arrays(cls, x, y, d, cluster_vars, z=None, use_other_treat_as_covariate

data = pd.concat((pd.DataFrame(cluster_vars, columns=cluster_cols), dml_data.data), axis=1)

return(cls(data, dml_data.y_col, dml_data.d_cols,
cluster_cols,
dml_data.x_cols, dml_data.z_cols,
dml_data.use_other_treat_as_covariate, dml_data.force_all_x_finite))
return (cls(data, dml_data.y_col, dml_data.d_cols, cluster_cols,
dml_data.x_cols, dml_data.z_cols,
dml_data.use_other_treat_as_covariate, dml_data.force_all_x_finite))

@property
def cluster_cols(self):
Expand Down
4 changes: 2 additions & 2 deletions doubleml/double_ml_iivm.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def _check_data(self, obj_dml_data):
one_treat = (obj_dml_data.n_treat == 1)
binary_treat = (type_of_target(obj_dml_data.d) == 'binary')
zero_one_treat = np.all((np.power(obj_dml_data.d, 2) - obj_dml_data.d) == 0)
if not(one_treat & binary_treat & zero_one_treat):
if not (one_treat & binary_treat & zero_one_treat):
raise ValueError('Incompatible data. '
'To fit an IIVM model with DML '
'exactly one binary variable with values 0 and 1 '
Expand All @@ -219,7 +219,7 @@ def _check_data(self, obj_dml_data):
if one_instr:
binary_instr = (type_of_target(obj_dml_data.z) == 'binary')
zero_one_instr = np.all((np.power(obj_dml_data.z, 2) - obj_dml_data.z) == 0)
if not(one_instr & binary_instr & zero_one_instr):
if not (one_instr & binary_instr & zero_one_instr):
raise ValueError(err_msg)
else:
raise ValueError(err_msg)
Expand Down
84 changes: 83 additions & 1 deletion doubleml/double_ml_irm.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import numpy as np
import pandas as pd
import warnings
from sklearn.utils import check_X_y
from sklearn.utils.multiclass import type_of_target

from .double_ml import DoubleML

from .double_ml_blp import DoubleMLBLP
from .double_ml_data import DoubleMLData
from .double_ml_score_mixins import LinearScoreMixin

from ._utils import _dml_cv_predict, _get_cond_smpls, _dml_tune, _check_finite_predictions


Expand Down Expand Up @@ -171,7 +176,7 @@ def _check_data(self, obj_dml_data):
one_treat = (obj_dml_data.n_treat == 1)
binary_treat = (type_of_target(obj_dml_data.d) == 'binary')
zero_one_treat = np.all((np.power(obj_dml_data.d, 2) - obj_dml_data.d) == 0)
if not(one_treat & binary_treat & zero_one_treat):
if not (one_treat & binary_treat & zero_one_treat):
raise ValueError('Incompatible data. '
'To fit an IRM model with DML '
'exactly one binary variable with values 0 and 1 '
Expand Down Expand Up @@ -325,3 +330,80 @@ def _nuisance_tuning(self, smpls, param_grids, scoring_methods, n_folds_tune, n_
'tune_res': tune_res}

return res

def cate(self, basis):
"""
Calculate conditional average treatment effects (CATE) for a given basis.

Parameters
----------
basis : :class:`pandas.DataFrame`
The basis for estimating the best linear predictor. Has to have the shape ``(n_obs, d)``,
where ``n_obs`` is the number of observations and ``d`` is the number of predictors.

Returns
-------
model : :class:`doubleML.DoubleMLBLP`
Best linear Predictor model.
"""
valid_score = ['ATE']
if self.score not in valid_score:
raise ValueError('Invalid score ' + self.score + '. ' +
'Valid score ' + ' or '.join(valid_score) + '.')

if self.n_rep != 1:
raise NotImplementedError('Only implemented for one repetition. ' +
f'Number of repetitions is {str(self.n_rep)}.')

# define the orthogonal signal
orth_signal = self.psi_elements['psi_b'].reshape(-1)
# fit the best linear predictor
model = DoubleMLBLP(orth_signal, basis=basis).fit()

return model

def gate(self, groups):
"""
Calculate group average treatment effects (GATE) for mutually exclusive groups.

Parameters
----------
groups : :class:`pandas.DataFrame`
The group indicator for estimating the best linear predictor.
Has to be dummy coded with shape ``(n_obs, d)``, where ``n_obs`` is the number of observations
and ``d`` is the number of groups or ``(n_obs, 1)`` and contain the corresponding groups.

Returns
-------
model : :class:`doubleML.DoubleMLBLPGATE`
Best linear Predictor model for Group Effects.
"""
valid_score = ['ATE']
if self.score not in valid_score:
raise ValueError('Invalid score ' + self.score + '. ' +
'Valid score ' + ' or '.join(valid_score) + '.')

if self.n_rep != 1:
raise NotImplementedError('Only implemented for one repetition. ' +
f'Number of repetitions is {str(self.n_rep)}.')

if not isinstance(groups, pd.DataFrame):
raise TypeError('Groups must be of DataFrame type. '
f'Groups of type {str(type(groups))} was passed.')

if not all(groups.dtypes == bool) or all(groups.dtypes == int):
if groups.shape[1] == 1:
groups = pd.get_dummies(groups, prefix='Group', prefix_sep='_')
else:
raise TypeError('Columns of groups must be of bool type or int type (dummy coded). '
'Alternatively, groups should only contain one column.')

if any(groups.sum(0) <= 5):
warnings.warn('At least one group effect is estimated with less than 6 observations.')

# define the orthogonal signal
orth_signal = self.psi_elements['psi_b'].reshape(-1)
# fit the best linear predictor for GATE (different confint() method)
model = DoubleMLBLP(orth_signal, basis=groups, is_gate=True).fit()

return model
Loading