Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions src/microplex_us/pipelines/check_export_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@
# Path to the committed contract shipped alongside this module.
DEFAULT_CONTRACT_PATH = Path(__file__).with_name("ecps_export_contract.json")

SIGNED_NUMERIC_SUPPORT_COLUMNS = frozenset(
{
"farm_income",
"farm_operations_income",
"partnership_s_corp_income",
"rental_income",
}
)


@dataclass
class ColumnDiff:
Expand All @@ -74,6 +83,8 @@ class ColumnSupportStats:
kind: str
row_count: int
nonzero_count: int | None
positive_count: int | None
negative_count: int | None
unique_count: int


Expand Down Expand Up @@ -145,7 +156,8 @@ def compute_support_diff(
*populates* a required exported column, MP must populate it too:

- numeric columns: eCPS has at least one nonzero value, so MP must also
have at least one nonzero value;
have at least one nonzero value. Declared signed-income exports must
also preserve positive/negative support when eCPS has it;
- boolean/string/categorical columns: eCPS has more than one unique value,
so MP must also vary.

Expand Down Expand Up @@ -178,7 +190,10 @@ def compute_support_diff(
continue
checked_columns.append(column)
baseline_stats = _support_stats(column, baseline_values)
requirement = _support_requirement(baseline_stats)
requirement = _support_requirement(
baseline_stats,
require_signed_numeric=column in SIGNED_NUMERIC_SUPPORT_COLUMNS,
)
if requirement is None:
baseline_filler_columns.append(column)
continue
Expand Down Expand Up @@ -246,16 +261,22 @@ def _support_stats(column: str, values) -> ColumnSupportStats:
unique_count = int(len(np.unique(flattened))) if flattened.size else 0
kind = _support_kind(flattened)
nonzero_count: int | None = None
positive_count: int | None = None
negative_count: int | None = None
if kind == "numeric":
numeric = flattened
if np.issubdtype(numeric.dtype, np.floating):
numeric = numeric[np.isfinite(numeric)]
nonzero_count = int(np.count_nonzero(numeric))
positive_count = int(np.count_nonzero(numeric > 0))
negative_count = int(np.count_nonzero(numeric < 0))
return ColumnSupportStats(
column=column,
kind=kind,
row_count=int(flattened.size),
nonzero_count=nonzero_count,
positive_count=positive_count,
negative_count=negative_count,
unique_count=unique_count,
)

Expand All @@ -272,10 +293,24 @@ def _support_kind(values) -> str:
return "categorical"


def _support_requirement(stats: ColumnSupportStats) -> str | None:
def _support_requirement(
stats: ColumnSupportStats,
*,
require_signed_numeric: bool = True,
) -> str | None:
"""Return the support MP must match for an eCPS column, if any."""
if stats.kind == "numeric":
return "numeric_nonzero" if (stats.nonzero_count or 0) > 0 else None
if (stats.nonzero_count or 0) <= 0:
return None
has_positive = (stats.positive_count or 0) > 0
has_negative = (stats.negative_count or 0) > 0
if require_signed_numeric and has_positive and has_negative:
return "numeric_signed"
if has_positive:
return "numeric_positive"
if has_negative:
return "numeric_negative"
return "numeric_nonzero"
return "categorical_variation" if stats.unique_count > 1 else None


Expand All @@ -287,10 +322,23 @@ def _satisfies_support_requirement(
"""Return whether candidate stats meet an eCPS-derived requirement."""
if stats is None:
return False
if requirement == "numeric_nonzero":
if requirement in {
"numeric_nonzero",
"numeric_positive",
"numeric_negative",
"numeric_signed",
}:
if stats.kind != "numeric":
return stats.unique_count > 1
return (stats.nonzero_count or 0) > 0
if requirement == "numeric_nonzero":
return (stats.nonzero_count or 0) > 0
if requirement == "numeric_positive":
return (stats.positive_count or 0) > 0
if requirement == "numeric_negative":
return (stats.negative_count or 0) > 0
return (stats.positive_count or 0) > 0 and (
stats.negative_count or 0
) > 0
if requirement == "categorical_variation":
return stats.unique_count > 1
raise ValueError(f"Unknown support requirement: {requirement}")
Expand Down Expand Up @@ -438,7 +486,10 @@ def _compact_stats(stats: ColumnSupportStats | None) -> str:
if stats is None:
return "missing"
if stats.kind == "numeric":
return f"nonzero {stats.nonzero_count}/{stats.row_count}"
return (
f"nonzero {stats.nonzero_count}/{stats.row_count}; "
f"+{stats.positive_count}, -{stats.negative_count}"
)
return f"unique {stats.unique_count}/{stats.row_count}"


Expand Down
45 changes: 34 additions & 11 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -10970,6 +10970,32 @@ def first_nonzero_or_present(*columns: str) -> pd.Series:
def has_any(*columns: str) -> bool:
return any(column in result.columns for column in columns)

def signed_rental_income() -> pd.Series:
if has_any("rental_income_positive", "rental_income_negative"):
return first_present("rental_income_positive") - first_present(
"rental_income_negative"
)
return first_present("rental_income")

def first_signed_or_present(*columns: str) -> pd.Series:
fallback: pd.Series | None = None
for column in columns:
if column not in result.columns:
continue
candidate = first_present(column)
if fallback is None:
fallback = candidate
if candidate.lt(0.0).any():
if fallback is not None:
return candidate.where(candidate.ne(0.0), fallback)
return candidate
return fallback if fallback is not None else zero.copy()

signed_self_employment_income = first_signed_or_present(
"self_employment_income_before_lsr",
"self_employment_income",
)

if "is_female" in result.columns:
result["is_female"] = result["is_female"].fillna(False).astype(bool)
elif "sex" in result.columns:
Expand Down Expand Up @@ -11104,13 +11130,10 @@ def has_any(*columns: str) -> bool:
result["takes_up_ssi_if_eligible"] = first_present("ssi").gt(0.0)

known_nonemployment = (
first_nonzero_or_present(
"self_employment_income_before_lsr",
"self_employment_income",
)
signed_self_employment_income
+ first_nonzero_or_present("taxable_interest_income", "interest_income")
+ first_nonzero_or_present("ordinary_dividend_income", "dividend_income")
+ first_present("rental_income")
+ signed_rental_income()
+ first_present("gross_social_security", "social_security")
+ first_present("ssi")
+ first_present("public_assistance")
Expand All @@ -11133,10 +11156,7 @@ def has_any(*columns: str) -> bool:
)
else fallback_employment_income
)
result["self_employment_income_before_lsr"] = first_nonzero_or_present(
"self_employment_income_before_lsr",
"self_employment_income",
)
result["self_employment_income_before_lsr"] = signed_self_employment_income
result["taxable_interest_income"] = first_nonzero_or_present(
"taxable_interest_income",
"interest_income",
Expand Down Expand Up @@ -11189,10 +11209,13 @@ def has_any(*columns: str) -> bool:
result["partnership_s_corp_income"] = first_present("partnership_s_corp_income")
result["partnership_se_income"] = first_present("partnership_se_income")
result["estate_income"] = first_present("estate_income")
result["farm_income"] = first_present("farm_income")
result["farm_income"] = first_signed_or_present(
"farm_income",
"farm_operations_income",
)
result["farm_operations_income"] = first_present("farm_operations_income")
result["farm_rent_income"] = first_present("farm_rent_income")
result["rental_income"] = first_present("rental_income")
result["rental_income"] = signed_rental_income()
result["w2_wages_from_qualified_business"] = first_present(
"w2_wages_from_qualified_business"
).clip(lower=0.0)
Expand Down
83 changes: 83 additions & 0 deletions tests/pipelines/test_check_export_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,89 @@ def test_support_baseline_rejects_numeric_column_eCPS_populates(
assert rc == 1


def test_support_baseline_rejects_missing_numeric_sign_support(
tmp_path,
):
contract_path = _write_json(
tmp_path / "contract.json",
{
"required": ["age", "snap", "rental_income"],
"ecps_internal_optional": [],
"forbidden": [],
},
)
candidate = _write_period_h5(
tmp_path / "candidate.h5",
{
"age": [34, 42, 50],
"snap": [False, True, False],
"rental_income": [0.0, 12_000.0, 0.0],
},
)
baseline = _write_period_h5(
tmp_path / "baseline.h5",
{
"age": [34, 42, 50],
"snap": [False, True, False],
"rental_income": [-200.0, 12_000.0, 0.0],
},
)
diagnostics = tmp_path / "support.json"

rc = main(
[
str(candidate),
"--contract",
str(contract_path),
"--support-baseline",
str(baseline),
"--support-diagnostics-json",
str(diagnostics),
]
)

assert rc == 1
payload = json.loads(diagnostics.read_text())
assert payload["issues"][0]["column"] == "rental_income"
assert payload["issues"][0]["requirement"] == "numeric_signed"
assert payload["issues"][0]["baseline"]["negative_count"] == 1
assert payload["issues"][0]["candidate"]["negative_count"] == 0


def test_support_baseline_accepts_negative_noise_for_unsigned_numeric(
tmp_path,
contract_path,
):
candidate = _write_period_h5(
tmp_path / "candidate.h5",
{
"age": [34, 42, 50],
"snap": [False, True, False],
"employment_income": [0.0, 12_000.0, 0.0],
},
)
baseline = _write_period_h5(
tmp_path / "baseline.h5",
{
"age": [34, 42, 50],
"snap": [False, True, False],
"employment_income": [-200.0, 12_000.0, 0.0],
},
)

rc = main(
[
str(candidate),
"--contract",
str(contract_path),
"--support-baseline",
str(baseline),
]
)

assert rc == 0


def test_support_baseline_rejects_categorical_column_eCPS_varies(
tmp_path,
contract_path,
Expand Down
2 changes: 1 addition & 1 deletion tests/pipelines/test_mp300k_artifact_gates.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def test_export_support_gate_rejects_ecps_populated_numeric_filler(tmp_path):
assert support_gate["status"] == "fail"
assert support_gate["metrics"]["unsupported_populated_export_column_count"] == 1
assert support_gate["details"]["issues"][0]["column"] == "hourly_wage"
assert support_gate["details"]["issues"][0]["requirement"] == "numeric_nonzero"
assert support_gate["details"]["issues"][0]["requirement"] == "numeric_positive"


def test_export_support_gate_rejects_ecps_varied_categorical_filler(tmp_path):
Expand Down
54 changes: 54 additions & 0 deletions tests/pipelines/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -5247,6 +5247,60 @@ def test_augment_policyengine_person_inputs_aliases_rent_to_pre_subsidy_rent(
9_600.0,
]

def test_augment_policyengine_person_inputs_recomposes_signed_rental_income(
self,
):
pipeline = USMicroplexPipeline(USMicroplexBuildConfig())
persons = pd.DataFrame(
{
"age": [45, 50, 55],
"sex": [1, 2, 1],
"income": [1_000.0, 1_000.0, 1_000.0],
"rental_income": [900.0, 900.0, 900.0],
"rental_income_positive": [300.0, 0.0, 50.0],
"rental_income_negative": [100.0, 200.0, 0.0],
}
)

augmented = pipeline._augment_policyengine_person_inputs(persons)

assert augmented["rental_income"].tolist() == [200.0, -200.0, 50.0]
assert augmented["employment_income_before_lsr"].tolist() == [
800.0,
1_200.0,
950.0,
]

def test_augment_policyengine_person_inputs_prefers_signed_business_losses(
self,
):
pipeline = USMicroplexPipeline(USMicroplexBuildConfig())
persons = pd.DataFrame(
{
"age": [45, 50, 55],
"sex": [1, 2, 1],
"income": [1_000.0, 1_000.0, 1_000.0],
"self_employment_income_before_lsr": [50.0, 60.0, 70.0],
"self_employment_income": [100.0, -25.0, 0.0],
"farm_income": [20.0, 30.0, 40.0],
"farm_operations_income": [10.0, -15.0, 0.0],
}
)

augmented = pipeline._augment_policyengine_person_inputs(persons)

assert augmented["self_employment_income_before_lsr"].tolist() == [
100.0,
-25.0,
70.0,
]
assert augmented["farm_income"].tolist() == [10.0, -15.0, 40.0]
assert augmented["employment_income_before_lsr"].tolist() == [
900.0,
1_025.0,
930.0,
]

def test_augment_policyengine_person_inputs_zeros_part_b_without_medicare(
self,
):
Expand Down
Loading