diff --git a/src/microplex_us/pipelines/check_export_columns.py b/src/microplex_us/pipelines/check_export_columns.py index c8458d0..e74bc75 100644 --- a/src/microplex_us/pipelines/check_export_columns.py +++ b/src/microplex_us/pipelines/check_export_columns.py @@ -51,6 +51,15 @@ # Path to the committed contract shipped alongside this module. DEFAULT_CONTRACT_PATH = Path(__file__).with_name("ecps_export_contract.json") +SIGNED_NUMERIC_SUPPORT_COLUMNS = frozenset( + { + "farm_income", + "farm_operations_income", + "partnership_s_corp_income", + "rental_income", + } +) + @dataclass class ColumnDiff: @@ -74,6 +83,8 @@ class ColumnSupportStats: kind: str row_count: int nonzero_count: int | None + positive_count: int | None + negative_count: int | None unique_count: int @@ -145,7 +156,8 @@ def compute_support_diff( *populates* a required exported column, MP must populate it too: - numeric columns: eCPS has at least one nonzero value, so MP must also - have at least one nonzero value; + have at least one nonzero value. Declared signed-income exports must + also preserve positive/negative support when eCPS has it; - boolean/string/categorical columns: eCPS has more than one unique value, so MP must also vary. @@ -178,7 +190,10 @@ def compute_support_diff( continue checked_columns.append(column) baseline_stats = _support_stats(column, baseline_values) - requirement = _support_requirement(baseline_stats) + requirement = _support_requirement( + baseline_stats, + require_signed_numeric=column in SIGNED_NUMERIC_SUPPORT_COLUMNS, + ) if requirement is None: baseline_filler_columns.append(column) continue @@ -246,16 +261,22 @@ def _support_stats(column: str, values) -> ColumnSupportStats: unique_count = int(len(np.unique(flattened))) if flattened.size else 0 kind = _support_kind(flattened) nonzero_count: int | None = None + positive_count: int | None = None + negative_count: int | None = None if kind == "numeric": numeric = flattened if np.issubdtype(numeric.dtype, np.floating): numeric = numeric[np.isfinite(numeric)] nonzero_count = int(np.count_nonzero(numeric)) + positive_count = int(np.count_nonzero(numeric > 0)) + negative_count = int(np.count_nonzero(numeric < 0)) return ColumnSupportStats( column=column, kind=kind, row_count=int(flattened.size), nonzero_count=nonzero_count, + positive_count=positive_count, + negative_count=negative_count, unique_count=unique_count, ) @@ -272,10 +293,24 @@ def _support_kind(values) -> str: return "categorical" -def _support_requirement(stats: ColumnSupportStats) -> str | None: +def _support_requirement( + stats: ColumnSupportStats, + *, + require_signed_numeric: bool = True, +) -> str | None: """Return the support MP must match for an eCPS column, if any.""" if stats.kind == "numeric": - return "numeric_nonzero" if (stats.nonzero_count or 0) > 0 else None + if (stats.nonzero_count or 0) <= 0: + return None + has_positive = (stats.positive_count or 0) > 0 + has_negative = (stats.negative_count or 0) > 0 + if require_signed_numeric and has_positive and has_negative: + return "numeric_signed" + if has_positive: + return "numeric_positive" + if has_negative: + return "numeric_negative" + return "numeric_nonzero" return "categorical_variation" if stats.unique_count > 1 else None @@ -287,10 +322,23 @@ def _satisfies_support_requirement( """Return whether candidate stats meet an eCPS-derived requirement.""" if stats is None: return False - if requirement == "numeric_nonzero": + if requirement in { + "numeric_nonzero", + "numeric_positive", + "numeric_negative", + "numeric_signed", + }: if stats.kind != "numeric": return stats.unique_count > 1 - return (stats.nonzero_count or 0) > 0 + if requirement == "numeric_nonzero": + return (stats.nonzero_count or 0) > 0 + if requirement == "numeric_positive": + return (stats.positive_count or 0) > 0 + if requirement == "numeric_negative": + return (stats.negative_count or 0) > 0 + return (stats.positive_count or 0) > 0 and ( + stats.negative_count or 0 + ) > 0 if requirement == "categorical_variation": return stats.unique_count > 1 raise ValueError(f"Unknown support requirement: {requirement}") @@ -438,7 +486,10 @@ def _compact_stats(stats: ColumnSupportStats | None) -> str: if stats is None: return "missing" if stats.kind == "numeric": - return f"nonzero {stats.nonzero_count}/{stats.row_count}" + return ( + f"nonzero {stats.nonzero_count}/{stats.row_count}; " + f"+{stats.positive_count}, -{stats.negative_count}" + ) return f"unique {stats.unique_count}/{stats.row_count}" diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index e6b6c9c..c3d6aa6 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -10970,6 +10970,32 @@ def first_nonzero_or_present(*columns: str) -> pd.Series: def has_any(*columns: str) -> bool: return any(column in result.columns for column in columns) + def signed_rental_income() -> pd.Series: + if has_any("rental_income_positive", "rental_income_negative"): + return first_present("rental_income_positive") - first_present( + "rental_income_negative" + ) + return first_present("rental_income") + + def first_signed_or_present(*columns: str) -> pd.Series: + fallback: pd.Series | None = None + for column in columns: + if column not in result.columns: + continue + candidate = first_present(column) + if fallback is None: + fallback = candidate + if candidate.lt(0.0).any(): + if fallback is not None: + return candidate.where(candidate.ne(0.0), fallback) + return candidate + return fallback if fallback is not None else zero.copy() + + signed_self_employment_income = first_signed_or_present( + "self_employment_income_before_lsr", + "self_employment_income", + ) + if "is_female" in result.columns: result["is_female"] = result["is_female"].fillna(False).astype(bool) elif "sex" in result.columns: @@ -11104,13 +11130,10 @@ def has_any(*columns: str) -> bool: result["takes_up_ssi_if_eligible"] = first_present("ssi").gt(0.0) known_nonemployment = ( - first_nonzero_or_present( - "self_employment_income_before_lsr", - "self_employment_income", - ) + signed_self_employment_income + first_nonzero_or_present("taxable_interest_income", "interest_income") + first_nonzero_or_present("ordinary_dividend_income", "dividend_income") - + first_present("rental_income") + + signed_rental_income() + first_present("gross_social_security", "social_security") + first_present("ssi") + first_present("public_assistance") @@ -11133,10 +11156,7 @@ def has_any(*columns: str) -> bool: ) else fallback_employment_income ) - result["self_employment_income_before_lsr"] = first_nonzero_or_present( - "self_employment_income_before_lsr", - "self_employment_income", - ) + result["self_employment_income_before_lsr"] = signed_self_employment_income result["taxable_interest_income"] = first_nonzero_or_present( "taxable_interest_income", "interest_income", @@ -11189,10 +11209,13 @@ def has_any(*columns: str) -> bool: result["partnership_s_corp_income"] = first_present("partnership_s_corp_income") result["partnership_se_income"] = first_present("partnership_se_income") result["estate_income"] = first_present("estate_income") - result["farm_income"] = first_present("farm_income") + result["farm_income"] = first_signed_or_present( + "farm_income", + "farm_operations_income", + ) result["farm_operations_income"] = first_present("farm_operations_income") result["farm_rent_income"] = first_present("farm_rent_income") - result["rental_income"] = first_present("rental_income") + result["rental_income"] = signed_rental_income() result["w2_wages_from_qualified_business"] = first_present( "w2_wages_from_qualified_business" ).clip(lower=0.0) diff --git a/tests/pipelines/test_check_export_columns.py b/tests/pipelines/test_check_export_columns.py index c74162f..f8c1acd 100644 --- a/tests/pipelines/test_check_export_columns.py +++ b/tests/pipelines/test_check_export_columns.py @@ -194,6 +194,89 @@ def test_support_baseline_rejects_numeric_column_eCPS_populates( assert rc == 1 +def test_support_baseline_rejects_missing_numeric_sign_support( + tmp_path, +): + contract_path = _write_json( + tmp_path / "contract.json", + { + "required": ["age", "snap", "rental_income"], + "ecps_internal_optional": [], + "forbidden": [], + }, + ) + candidate = _write_period_h5( + tmp_path / "candidate.h5", + { + "age": [34, 42, 50], + "snap": [False, True, False], + "rental_income": [0.0, 12_000.0, 0.0], + }, + ) + baseline = _write_period_h5( + tmp_path / "baseline.h5", + { + "age": [34, 42, 50], + "snap": [False, True, False], + "rental_income": [-200.0, 12_000.0, 0.0], + }, + ) + diagnostics = tmp_path / "support.json" + + rc = main( + [ + str(candidate), + "--contract", + str(contract_path), + "--support-baseline", + str(baseline), + "--support-diagnostics-json", + str(diagnostics), + ] + ) + + assert rc == 1 + payload = json.loads(diagnostics.read_text()) + assert payload["issues"][0]["column"] == "rental_income" + assert payload["issues"][0]["requirement"] == "numeric_signed" + assert payload["issues"][0]["baseline"]["negative_count"] == 1 + assert payload["issues"][0]["candidate"]["negative_count"] == 0 + + +def test_support_baseline_accepts_negative_noise_for_unsigned_numeric( + tmp_path, + contract_path, +): + candidate = _write_period_h5( + tmp_path / "candidate.h5", + { + "age": [34, 42, 50], + "snap": [False, True, False], + "employment_income": [0.0, 12_000.0, 0.0], + }, + ) + baseline = _write_period_h5( + tmp_path / "baseline.h5", + { + "age": [34, 42, 50], + "snap": [False, True, False], + "employment_income": [-200.0, 12_000.0, 0.0], + }, + ) + + rc = main( + [ + str(candidate), + "--contract", + str(contract_path), + "--support-baseline", + str(baseline), + ] + ) + + assert rc == 0 + + def test_support_baseline_rejects_categorical_column_eCPS_varies( tmp_path, contract_path, diff --git a/tests/pipelines/test_mp300k_artifact_gates.py b/tests/pipelines/test_mp300k_artifact_gates.py index e3d963c..c5598bd 100644 --- a/tests/pipelines/test_mp300k_artifact_gates.py +++ b/tests/pipelines/test_mp300k_artifact_gates.py @@ -361,7 +361,7 @@ def test_export_support_gate_rejects_ecps_populated_numeric_filler(tmp_path): assert support_gate["status"] == "fail" assert support_gate["metrics"]["unsupported_populated_export_column_count"] == 1 assert support_gate["details"]["issues"][0]["column"] == "hourly_wage" - assert support_gate["details"]["issues"][0]["requirement"] == "numeric_nonzero" + assert support_gate["details"]["issues"][0]["requirement"] == "numeric_positive" def test_export_support_gate_rejects_ecps_varied_categorical_filler(tmp_path): diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 02fe434..13ae9c5 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -5247,6 +5247,60 @@ def test_augment_policyengine_person_inputs_aliases_rent_to_pre_subsidy_rent( 9_600.0, ] + def test_augment_policyengine_person_inputs_recomposes_signed_rental_income( + self, + ): + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + persons = pd.DataFrame( + { + "age": [45, 50, 55], + "sex": [1, 2, 1], + "income": [1_000.0, 1_000.0, 1_000.0], + "rental_income": [900.0, 900.0, 900.0], + "rental_income_positive": [300.0, 0.0, 50.0], + "rental_income_negative": [100.0, 200.0, 0.0], + } + ) + + augmented = pipeline._augment_policyengine_person_inputs(persons) + + assert augmented["rental_income"].tolist() == [200.0, -200.0, 50.0] + assert augmented["employment_income_before_lsr"].tolist() == [ + 800.0, + 1_200.0, + 950.0, + ] + + def test_augment_policyengine_person_inputs_prefers_signed_business_losses( + self, + ): + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + persons = pd.DataFrame( + { + "age": [45, 50, 55], + "sex": [1, 2, 1], + "income": [1_000.0, 1_000.0, 1_000.0], + "self_employment_income_before_lsr": [50.0, 60.0, 70.0], + "self_employment_income": [100.0, -25.0, 0.0], + "farm_income": [20.0, 30.0, 40.0], + "farm_operations_income": [10.0, -15.0, 0.0], + } + ) + + augmented = pipeline._augment_policyengine_person_inputs(persons) + + assert augmented["self_employment_income_before_lsr"].tolist() == [ + 100.0, + -25.0, + 70.0, + ] + assert augmented["farm_income"].tolist() == [10.0, -15.0, 40.0] + assert augmented["employment_income_before_lsr"].tolist() == [ + 900.0, + 1_025.0, + 930.0, + ] + def test_augment_policyengine_person_inputs_zeros_part_b_without_medicare( self, ):