From 2ee436ff4cf46d28c876e889fbb80a52553b35e4 Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Fri, 29 May 2026 17:38:09 -0400 Subject: [PATCH 1/6] Add tests for UQ assign_intermediate_uncertainties --- test/rmgpy/tools/uncertaintyTest.py | 75 +++++++++++++++++++++++++---- 1 file changed, 66 insertions(+), 9 deletions(-) diff --git a/test/rmgpy/tools/uncertaintyTest.py b/test/rmgpy/tools/uncertaintyTest.py index 35ac31049a..69e3badde7 100644 --- a/test/rmgpy/tools/uncertaintyTest.py +++ b/test/rmgpy/tools/uncertaintyTest.py @@ -169,16 +169,73 @@ def test_uncertainty_assignment(self): thermo_unc = self.uncertainty.thermo_input_uncertainties kinetic_unc = self.uncertainty.kinetic_input_uncertainties - np.testing.assert_allclose( - thermo_unc, - [1.5, 1.5, 2.61966, 2.51994, 2.23886, 1.5, 2.30761, 2.41611, 2.61966, 2.51994, 2.61966, 2.51994, 2.61966, 1.5, 2.23886, 2.30761, 1.5, 2.61966, 2.07366, 2.19376, 2.19376, 2.30761, 1.94616, 2.07366, 2.07366], - rtol=1e-4, - ) - np.testing.assert_allclose( - kinetic_unc, - [0.5, 1.118, 1.9783, 1.9783, 1.5363, 0.5, 2.0, 1.5363, 1.5363, 0.5], - rtol=1e-4 + expected_uncorrelated_thermo_uncertainties = np.array([1.5, 1.5, 2.61966, 2.51994, 2.23886, 1.5, 2.30761, 2.41611, 2.61966, 2.51994, 2.61966, 2.51994, 2.61966, 1.5, 2.23886, 2.30761, 1.5, 2.61966, 2.07366, 2.19376, 2.19376, 2.30761, 1.94616, 2.07366, 2.07366]) + expected_uncorrelated_kinetic_uncertainties = np.array([0.5, 1.118, 1.9783, 1.9783, 1.5363, 0.5, 2.0, 1.5363, 1.5363, 0.5]) + np.testing.assert_allclose(thermo_unc, expected_uncorrelated_thermo_uncertainties, rtol=1e-4) + np.testing.assert_allclose(kinetic_unc, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4) + + # ---------------------------- Now repeat for assign_intermediate_uncertainties ----------------------------- + # uncorrelated + self.uncertainty.assign_intermediate_uncertainties(correlated=False) + intermediate_thermo_unc = self.uncertainty.thermo_intermediate_uncertainties + intermediate_kinetic_unc = self.uncertainty.kinetic_intermediate_uncertainties + np.testing.assert_allclose(intermediate_thermo_unc, expected_uncorrelated_thermo_uncertainties, rtol=1e-4) + np.testing.assert_allclose(intermediate_kinetic_unc, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4) + + # correlated + self.uncertainty.assign_intermediate_uncertainties(correlated=True) + + # do a spot check on some of the intermediates (dG/dq) these are derivatives, not uncertainties + # Thermo library example + assert self.uncertainty.thermo_intermediate_uncertainties[0].keys() == {'Library O(0)'} + assert self.uncertainty.thermo_intermediate_uncertainties[0]['Library O(0)'] == 1 + + # Thermo GAV example + assert tuple(sorted(self.uncertainty.thermo_intermediate_uncertainties[2].keys())) == ('Estimation HO2(2)', 'Group(group) O2s-OsH', 'Group(other) R', 'Group(radical) HOOJ') + assert self.uncertainty.thermo_intermediate_uncertainties[2]['Estimation HO2(2)'] == 1 + assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(group) O2s-OsH'] == 2 + assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(other) R'] == 2 + assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(radical) HOOJ'] == 1 + + # Thermo library + GAV + assert tuple(sorted(self.uncertainty.thermo_intermediate_uncertainties[14].keys())) == ('Estimation CH3(14)', 'Group(radical) CH3', 'Library CH4(16)') + assert self.uncertainty.thermo_intermediate_uncertainties[14]['Estimation CH3(14)'] == 1 + assert self.uncertainty.thermo_intermediate_uncertainties[14]['Group(radical) CH3'] == 1 + assert self.uncertainty.thermo_intermediate_uncertainties[14]['Library CH4(16)'] == 1 + + # Kinetics library + assert self.uncertainty.kinetic_intermediate_uncertainties[0].keys() == {'Library O(0)+H2O2(3)<=>OH(1)+HO2(2)'} + + # Rate rule (exact) + assert tuple(sorted(self.uncertainty.kinetic_intermediate_uncertainties[1].keys())) == ('Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)', 'Rate Rule H_Abstraction C/H3/Cs;C_methyl') + assert self.uncertainty.kinetic_intermediate_uncertainties[1]['Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)'] == 1 + assert self.uncertainty.kinetic_intermediate_uncertainties[1]['Rate Rule H_Abstraction C/H3/Cs;C_methyl'] == 1 + + # Rate rule (non-exact, multiple rule weights) + assert tuple(sorted(self.uncertainty.kinetic_intermediate_uncertainties[3].keys())) == ( + 'Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', + 'Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', + 'Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs', + 'Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad', ) + assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'] == 1 + assert np.isclose(self.uncertainty.kinetic_intermediate_uncertainties[3]['Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'], 0.4771212547, rtol=1e-4) + assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs'] == 0.5 + assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad'] == 0.5 + + # Training reaction + assert self.uncertainty.kinetic_intermediate_uncertainties[5].keys() == {'Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'} + assert self.uncertainty.kinetic_intermediate_uncertainties[5]['Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'] == 1 + + # PDEP + assert self.uncertainty.kinetic_intermediate_uncertainties[6].keys() == {'PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'} + assert self.uncertainty.kinetic_intermediate_uncertainties[6]['PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'] == 1 + + # correlated uncertainties should match uncorrelated, so check diagonal of covariance matrix + thermo_covariance = np.sqrt(self.uncertainty.get_thermo_covariance_matrix().diagonal()) + kinetic_covariance = np.sqrt(self.uncertainty.get_kinetic_covariance_matrix().diagonal()) + assert np.isclose(thermo_covariance, expected_uncorrelated_thermo_uncertainties, rtol=1e-4).all() + assert np.isclose(kinetic_covariance, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4).all() def test_local_analysis(self): """ From 33b5fa0826691c9cfcd95ea37217db26907414a6 Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Wed, 27 May 2026 12:06:23 -0400 Subject: [PATCH 2/6] Add uncertainty tests for correlated library match Add some tests to make sure the correct index is referenced when assembling thermo sources that use libraries. For example, if CH3 is estimated from a CH4 library + a radical correction, we want to make sure the uncertainty source points to CH4 as the index for the library value used (as opposed to CH3) --- test/rmgpy/tools/uncertaintyTest.py | 46 +++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/test/rmgpy/tools/uncertaintyTest.py b/test/rmgpy/tools/uncertaintyTest.py index 69e3badde7..b3772ebecf 100644 --- a/test/rmgpy/tools/uncertaintyTest.py +++ b/test/rmgpy/tools/uncertaintyTest.py @@ -237,6 +237,52 @@ def test_uncertainty_assignment(self): assert np.isclose(thermo_covariance, expected_uncorrelated_thermo_uncertainties, rtol=1e-4).all() assert np.isclose(kinetic_covariance, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4).all() + def test_source_correlations(self): + # Check some examples of different species containing the same sources + + # ------------------------------------------------------------------------------ + # Make sure CH3 (Library + Radical) has a library index/value in common with CH4 + i_CH4 = rmgpy.tools.uncertainty.get_i_thing(rmgpy.species.Species(smiles='C'), self.uncertainty.species_list) + assert i_CH4 >= 0 + + i_CH3 = rmgpy.tools.uncertainty.get_i_thing(rmgpy.species.Species(smiles='[CH3]'), self.uncertainty.species_list) + assert i_CH3 >= 0 + + self.uncertainty.extract_sources_from_model() + self.uncertainty.assign_parameter_uncertainties(correlated=True) + + src1 = self.uncertainty.species_sources_dict[self.uncertainty.species_list[i_CH4]] # CH4 + src2 = self.uncertainty.species_sources_dict[self.uncertainty.species_list[i_CH3]] # CH3 + + assert 'Library' in src1 + assert 'Library' in src2 + assert 'GAV' in src2 + assert src1['Library'] == src2['Library'] # make sure they refer to the same library source + + # ----------------------------------------------------------------------------- + # Make sure CH3X (Library + GAV + Adsorption Correction) has a library index/value in common with CH4 (Library) + i_CH3X = rmgpy.tools.uncertainty.get_i_thing(rmgpy.species.Species(smiles='C*'), self.uncertainty.species_list) + assert i_CH3X == -1 + # This is not in the model, so add it to the species list + CH3X = rmgpy.species.Species(smiles='C*') + CH3X.thermo = self.uncertainty.database.thermo.get_thermo_data(CH3X) + self.uncertainty.species_list.append(CH3X) + i_CH3X = rmgpy.tools.uncertainty.get_i_thing(CH3X, self.uncertainty.species_list) + assert i_CH3X >= 0 + + self.uncertainty.extract_sources_from_model() + self.uncertainty.assign_parameter_uncertainties(correlated=True) + + src1 = self.uncertainty.species_sources_dict[self.uncertainty.species_list[i_CH4]] # CH4 + src2 = self.uncertainty.species_sources_dict[self.uncertainty.species_list[i_CH3X]] # CH3X + + assert 'Library' in src1 + assert 'Library' in src2 + assert 'ADS' in src2 + assert 'GAV' in src2 + assert src1['Library'] == src2['Library'] # make sure they refer to the same library source + self.uncertainty.species_list.pop() # remove the extra species so it doesn't affect other tests + def test_local_analysis(self): """ Test to run uncorrelated and then correlated local_analysis and make sure the results are expected From 6f5a3ead77047608f45e2ee31dc6a88ba7308c7e Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Thu, 28 May 2026 09:44:42 -0400 Subject: [PATCH 3/6] refactor thermo extract_source_from_comments This refactors the extract_source_from_comments function and changes library sources so that a tuple of the library name and the library entry object is returned instead of just the library name. It also returns a tuple of QM method and molecule used for the case of QM sources. --- rmgpy/data/thermo.py | 272 +++++++++++++++++++++++----------- test/rmgpy/data/thermoTest.py | 117 +++++---------- 2 files changed, 227 insertions(+), 162 deletions(-) diff --git a/rmgpy/data/thermo.py b/rmgpy/data/thermo.py index 5a699ae223..3dd817dab9 100644 --- a/rmgpy/data/thermo.py +++ b/rmgpy/data/thermo.py @@ -1643,7 +1643,7 @@ def correct_binding_energy(self, thermo, species, metal_to_scale_from=None, meta thermo.comment += f" Binding energy corrected by LSR ({'+'.join(comments)}) from {metal_to_scale_from} (H={change_in_binding_energy/1e3:+.0f}kJ/mol)" return thermo - def get_thermo_data_for_surface_species(self, species): + def get_thermo_data_for_surface_species(self, species, return_resonance_data=False): """ Get the thermo data for an adsorbed species, by desorbing it, finding the thermo of the gas-phase @@ -1652,6 +1652,9 @@ def get_thermo_data_for_surface_species(self, species): Does not apply linear scaling relationship. Returns a :class:`ThermoData` object, with no Cp0 or CpInf + + option to return resonance data if return_resonance_data is True, + which is useful for identifying the specific molecule chosen """ # define the comparison function to find the lowest energy @@ -1741,7 +1744,7 @@ def species_enthalpy(species): atom.label = label # a tuple of molecule and its thermo - resonance_data.append((molecule, thermo)) + resonance_data.append((molecule, thermo, gas_phase_species)) # Get the enthalpy of formation of every adsorbate at 298 K and # determine the resonance structure with the lowest enthalpy of formation. @@ -1751,7 +1754,7 @@ def species_enthalpy(species): resonance_data = sorted(resonance_data, key=lambda x: x[1].H298.value_si) # reorder the resonance structures (molecules) by their H298 - species.molecule = [mol for mol, thermo in resonance_data] + species.molecule = [mol for mol, thermo, gas_phase_species in resonance_data] thermo = resonance_data[0][1] @@ -1760,6 +1763,9 @@ def species_enthalpy(species): find_cp0_and_cpinf(species, thermo) + if return_resonance_data is True: # is True makes this a little more error-proof in case user accidentally provides another argument that evaluates to True + return thermo, resonance_data + return thermo def _add_adsorption_correction(self, adsorption_thermo, adsorption_groups, molecule, surface_sites): @@ -2854,82 +2860,9 @@ def get_ring_groups_from_comments(self, thermo_data): return ring_groups, polycyclic_groups - def extract_source_from_comments(self, species): - """ - `species`: A species object containing thermo data and thermo data comments - - Parses the verbose string of comments from the thermo data of the species object, - and extracts the thermo sources. - - Returns a dictionary with keys of 'Library', 'QM', 'ADS', and/or 'GAV'. - Commonly, species thermo are estimated using only one of these sources. - However, a radical can be estimated with more than one type of source, for - instance a saturated library value and a GAV HBI correction, or a QM saturated value - and a GAV HBI correction. Adsorbates can be estimated using a single library - for the adsorbate or a combination of a gas phase library for the - gas phase portion and an adsorption correction. - - source = {'Library': String_Name_of_Library_Used, - 'QM': String_of_Method_Used, - 'GAV': Dictionary_of_Groups_Used, - 'ADS': Dictionary_of_Adsorption_Group_Used, - } - - The Dictionary_of_Groups_Used looks like - {'groupType':[List of tuples containing (Entry, Weight)] - """ - comment = species.thermo.comment - tokens = comment.split() - - source = {} - - if comment.startswith('Thermo library'): - # Store name of the library source, which is the 3rd token in the comments - source['Library'] = tokens[2] - - elif comment.startswith('QM'): - # Store the level of the calculation, which is the 2nd token in the comments - source['QM'] = tokens[1] - - elif comment.startswith('Gas phase thermo'): - # Handle adsorption correction thermo data of the following format: - # Library example - # Gas phase thermo for C(T) from Thermo library: primaryThermoLibrary. - # Adsorption correction: + Thermo group additivity estimation: adsorptionPt111(Cq*) - - # GAV example - # Gas phase thermo for [CH]CC from Thermo group additivity estimation: group(Cs-CsCsHH) + group(Cs-CsHHH) + group(Cs-CsHHH) + radical(CCJ2_triplet). - # Adsorption correction: + Thermo group additivity estimation: adsorptionPt111(C=*RCR3)" - - comment = comment.replace(r'\n', ' ') - comment = comment.replace('\n', ' ') - if 'Adsorption correction:' not in comment: - raise ValueError(f'adsorption correction in unrecognized format {comment}') - - # Handle the gas-phase portion first - gas_comment = comment.split('Adsorption correction: + ')[0].strip() - if gas_comment.endswith('.'): - gas_comment = gas_comment[:-1] # delete the . at the end if it exists - gas_comment = gas_comment[gas_comment.find('from ', len('Gas phase thermo for ')) + len('from '):] - dummy_gas_phase_species = Species() - dummy_gas_phase_species.thermo = NASA() - dummy_gas_phase_species.thermo.comment = gas_comment - source = self.extract_source_from_comments(dummy_gas_phase_species) - - # This is an adsorption correction - # comment is split into two parts: the gas phase, and the surface adsorption correction - ads_correction_comment = comment.split('Adsorption correction: +')[-1].strip() - dummy_adsorption_correction_species = Species() - dummy_adsorption_correction_species.thermo = NASA() - dummy_adsorption_correction_species.thermo.comment = ads_correction_comment - source['ADS'] = self.extract_source_from_comments(dummy_adsorption_correction_species)['GAV'] - - return source - - # Check for group additivity contributions to the thermo in this species - - # The contribution of the groups can be either additive or subtracting - # after changes to the polycyclic algorithm + def _parse_gav_groups(self, comment): + """Extract the groups from the comment""" + groups = {} comment = comment.replace(' + ', ' +') comment = comment.replace(' - ', ' -') @@ -2939,10 +2872,12 @@ def extract_source_from_comments(self, species): # groups are still split by spaces comment = comment.replace(')\n+', ') +') comment = comment.replace(')\n-', ') -') + # `Thermo group additivity estimation:\nadsorptionPt111(...)` shows up in + # adsorbate comments - keep the trailing colon separated from the group token. + comment = comment.replace(':\n', ': ') comment = comment.replace('\n', '') tokens = comment.split(' ') - groups = {} group_types = list(self.groups.keys()) regex = r"\((.*)\)" # only hit outermost parentheses @@ -2970,14 +2905,185 @@ def extract_source_from_comments(self, species): if groups: # Indicate that group additivity is used when it is either an HBI correction - # onto a thermo library or QM value, or if the entire molecule is estimated using group additivity + # onto a thermo library or QM value, or if the entire molecule is estimated using group additivity # Save the groups into the source dictionary - # Convert groups back into tuples + # Convert groups back into tuples for groupType, groupDict in groups.items(): groups[groupType] = list(groupDict.items()) - source['GAV'] = groups + return groups + + def _parse_library_source(self, comment, library_species): + # handle the library source comment, which looks like "Thermo library: library_name" + # we then need to retrieve the specific library entry given the species + # but may have unfortunate line breaks in the middle + + # trim the comment down to just the library portion so it starts with Thermo library: + split_loc = comment.find('Thermo library:') + if split_loc == -1: + raise ValueError(f"Expected 'Thermo library:' in comment, got {comment}") + + comment = comment[split_loc:] + + # library name is the token that comes immediately after 'Thermo library:' + assert 'Thermo library:' in comment, f"Expected 'Thermo library:' in comment, got {comment}" + tokens = comment.split() + library_name = tokens[2] + if library_name not in self.libraries: + raise DatabaseError(f"Thermo library '{library_name}' referenced in comment is not loaded. Make sure libraries match input file used to generate thermo.") + + results = self.get_thermo_data_from_library(library_species, self.libraries[library_name]) + if results is None: + raise DatabaseError(f"Could not find a library match for {library_species} in library {library_name}") + + data, thermo_library, library_entry = results + return (library_name, library_entry) + + def _parse_adsorption_correction(self, comment): + # handle the adsorption correction comment, which looks like + # "Adsorption correction: + Thermo group additivity estimation: adsorptionPt111(C-XR2CR3)" + # but may have unfortunate line breaks in the middle + + # check that the number of tokens matches our expectation for an adsorption correction + # should be 8, maybe 9 if there was a weird line break + tokens = comment.split() + if len(tokens) not in [8, 9]: + raise ValueError(f"Expected 8 or 9 tokens in adsorption correction comment, got {len(tokens)}: {comment}") + + ADS = self._parse_gav_groups(comment) + + if len(ADS) > 1: + raise ValueError("Only adsorption corrections should be present in the adsorption correction portion of the comment. Found: {}".format(ADS)) + + return ADS + + def extract_source_from_comments(self, species): + """ + `species`: A species object containing thermo data and thermo data comments + + Parses the verbose string of comments from the thermo data of the species object, + and extracts the thermo sources. + + Returns a dictionary with keys of 'Library', 'QM', 'ADS', and/or 'GAV'. + Commonly, species thermo are estimated using only one of these sources. + However, a radical can be estimated with more than one type of source, for + instance a saturated library value and a GAV HBI correction, or a QM saturated value + and a GAV HBI correction. Adsorbates can be estimated using a single library + for the adsorbate or a combination of a gas phase library for the + gas phase portion and an adsorption correction. + + source = {'Library': (String_Name_of_Library_Used, Library_Entry_Used), + 'QM': (String_of_Method_Used, Species_Used_for_QM), + 'GAV': Dictionary_of_Groups_Used, + 'ADS': Dictionary_of_Adsorption_Group_Used, + } + + The Dictionary_of_Groups_Used looks like + {'groupType':[List of tuples containing (Entry, Weight)] + """ + + # TODO: solvent, electrocat, LSR + source = {} + + comment = species.thermo.comment + tokens = comment.split() + + ads_correction = 'Gas phase thermo' in comment and 'Adsorption correction:' in comment + library = 'Thermo library' in comment + QM = 'QM' in tokens + GAV = 'Thermo group additivity estimation:' in comment # ambiguous since ads correction looks identical to group + + # the biggest thing to split on first is the adsorption correction + if ads_correction: + # The source options here are: + # (Library(gas-phase species), Adsorption correction) + # (QM(gas-phase species), Adsorption correction) + # (GAV(gas-phase species), Adsorption correction) + # (Library(gas-phase species), GAV(radical correction), Adsorption correction) + # (QM(gas-phase species), GAV(radical correction), Adsorption correction) + + # split the comment into the gas phase thermo portion and the adsorption correction portion + split_loc = comment.find('Adsorption correction:') + if split_loc == -1: + raise ValueError(f"Expected 'Adsorption correction:' in comment, got {comment}") + gas_comment = comment[:split_loc].strip() + if gas_comment.endswith('.'): + gas_comment = gas_comment[:-1] # the period that closed the gas-phase sentence + ads_correction_comment = comment[split_loc:].strip() + source['ADS'] = self._parse_adsorption_correction(ads_correction_comment) + + # get the desorbed gas species + species_copy = deepcopy(species) + thermo, resonance_data = self.get_thermo_data_for_surface_species(species_copy, return_resonance_data=True) + desorbed_gas_species = resonance_data[0][2] + + groups = self._parse_gav_groups(gas_comment) + if groups: # (Library/QM + GAV + ADS) or (GAV + ADS) + source['GAV'] = groups # handle the GAV portion + + if library or QM: # (Library/QM + GAV + ADS) + # this means the library/QM species is the desorbed, saturated gas-phase version of the adsorbate + + assert desorbed_gas_species.molecule[0].is_radical(), "Method only valid for radicals." + molecule = desorbed_gas_species.molecule[0] # no need to deepcopy again since get_desorbed_molecules already does deepcopy + molecule.saturate_radicals() # note, this returns a dictionary instead of the Molecule object, but it modifies the molecule in place, so we can just ignore the returned dictionary + saturated_desorbed_gas_species = Species(molecule=[molecule]) + saturated_desorbed_gas_species.generate_resonance_structures() + + if library: # (Library(gas-phase species), GAV(radical correction), Adsorption correction) + source['Library'] = self._parse_library_source(gas_comment, saturated_desorbed_gas_species) + if QM: # (QM(gas-phase species), GAV(radical correction), Adsorption correction) + # whatever token comes immediately after 'QM' is the method used + source['QM'] = (tokens[tokens.index('QM') + 1], saturated_desorbed_gas_species) + + else: + # no groups, so this is (Library/QM + ADS) + # in this case, the library/QM species is the desorbed gas-phase molecule of the adsorbate + if library: + source['Library'] = self._parse_library_source(gas_comment, desorbed_gas_species) + if QM: + # whatever token comes immediately after 'QM' is the method used + source['QM'] = (tokens[tokens.index('QM') + 1], desorbed_gas_species) + + else: + # gas phase only, source options are: + # (Library) + # (QM) + # (GAV) + # (Library, GAV) + # (QM, GAV) + groups = self._parse_gav_groups(comment) + GAV = 'Thermo group additivity estimation:' in comment + if GAV and not groups: + raise ValueError("No groups were found in the comments but 'Thermo group additivity estimation:' was in the comment. Comment: {}".format(comment)) + elif not GAV and groups: + if 'radical' not in groups.keys(): + raise ValueError("Groups were found in the comments but 'Thermo group additivity estimation:' was not in the comment. Comment: {}".format(comment)) + + if groups: + # Get groups first + source['GAV'] = groups + if library or QM: # (Library/QM, GAV) + # get the saturated species for the library source + if 'radical' not in groups.keys(): + raise ValueError("Method only valid for radicals, but no radical groups were found. Comment: {}".format(comment)) + molecule = deepcopy(species.molecule[0]) + assert molecule.is_radical(), "Method only valid for radicals." + molecule.saturate_radicals() # note, this returns a dictionary instead of the Molecule object, but it modifies the molecule in place, so we can just ignore the returned dictionary + saturated_species = Species(molecule=[molecule]) + saturated_species.generate_resonance_structures() + if library: # (Library, GAV) + source['Library'] = self._parse_library_source(comment, saturated_species) + if QM: # (QM, GAV) + # whatever token comes immediately after 'QM' is the method used + source['QM'] = (tokens[tokens.index('QM') + 1], saturated_species) + else: # (Library) or (QM) + if library: + source['Library'] = self._parse_library_source(comment, species) + if QM: + # whatever token comes immediately after 'QM' is the method used + source['QM'] = (tokens[tokens.index('QM') + 1], species) # Perform a sanity check that this molecule is estimated by at least one method if not list(source.keys()): diff --git a/test/rmgpy/data/thermoTest.py b/test/rmgpy/data/thermoTest.py index 656f4b8554..2d87e22de8 100644 --- a/test/rmgpy/data/thermoTest.py +++ b/test/rmgpy/data/thermoTest.py @@ -289,96 +289,50 @@ def test_parse_thermo_comments(self): assert len(source["GAV"]["longDistanceInteraction_cyclic"]) == 1 assert len(source["GAV"]["ring"]) == 1 - # Pure library thermo - dipk = Species( + propane = Species( index=1, - label="DIPK", + label="propane", thermo=NASA( polynomials=[ - NASAPolynomial( - coeffs=[ - 3.35002, - 0.017618, - -2.46235e-05, - 1.7684e-08, - -4.87962e-12, - 35555.7, - 5.75335, - ], - Tmin=(100, "K"), - Tmax=(888.28, "K"), - ), - NASAPolynomial( - coeffs=[ - 6.36001, - 0.00406378, - -1.73509e-06, - 5.05949e-10, - -4.49975e-14, - 35021, - -8.41155, - ], - Tmin=(888.28, "K"), - Tmax=(5000, "K"), - ), + NASAPolynomial(coeffs=[0.4987,0.0308692,-7.94064e-06,-5.75209e-09,3.10575e-12,-14122,21.6027], Tmin=(298,'K'), Tmax=(1036.31,'K')), + NASAPolynomial(coeffs=[2.0491,0.0302287,-1.47485e-05,3.60335e-09,-3.51534e-13,-14730.3,12.6832], Tmin=(1036.31,'K'), Tmax=(3000,'K')) ], - Tmin=(100, "K"), - Tmax=(5000, "K"), - comment="""Thermo library: DIPK""", + Tmin=(298,'K'), Tmax=(3000,'K'), E0=(-119.854,'kJ/mol'), Cp0=(33.2579,'J/(mol*K)'), CpInf=(249.434,'J/(mol*K)'), + label="""C3H8""", + comment="""Thermo library: DFT_QCI_thermo""" ), - molecule=[Molecule(smiles="CC(C)C(=O)C(C)C")], + molecule=[Molecule(smiles="CCC")], ) - source = self.database.extract_source_from_comments(dipk) - assert "Library" in source + CCC_source = self.database.extract_source_from_comments(propane) + assert "Library" in CCC_source + assert CCC_source["Library"][0] == "DFT_QCI_thermo" # Mixed library and HBI thermo - dipk_rad = Species( + propane_rad = Species( index=4, - label="R_tert", + label="propane_rad", thermo=NASA( polynomials=[ - NASAPolynomial( - coeffs=[ - 2.90061, - 0.0298018, - -7.06268e-05, - 6.9636e-08, - -2.42414e-11, - 54431, - 5.44492, - ], - Tmin=(100, "K"), - Tmax=(882.19, "K"), - ), - NASAPolynomial( - coeffs=[ - 6.70999, - 0.000201027, - 6.65617e-07, - -7.99543e-11, - 4.08721e-15, - 54238.6, - -9.73662, - ], - Tmin=(882.19, "K"), - Tmax=(5000, "K"), - ), + NASAPolynomial(coeffs=[0.599885,0.0320439,-2.63211e-05,1.15072e-08,-2.01662e-12,66381.4,20.2736], Tmin=(298,'K'), Tmax=(1331.58,'K')), + NASAPolynomial(coeffs=[6.43539,0.0145141,-6.57382e-06,1.62046e-09,-1.60381e-13,64827.3,-9.55036], Tmin=(1331.58,'K'), Tmax=(3000,'K')) ], - Tmin=(100, "K"), - Tmax=(5000, "K"), - comment="""Thermo library: DIPK + radical(C2CJCHO)""", + Tmin=(298,'K'), Tmax=(3000,'K'), E0=(549.676,'kJ/mol'), Cp0=(33.2579,'J/(mol*K)'), CpInf=(249.434,'J/(mol*K)'), + comment="""Thermo library: DFT_QCI_thermo + radical(CJ3)""" ), molecule=[ - Molecule(smiles="C[C](C)C(=O)C(C)C"), - Molecule(smiles="CC(C)=C([O])C(C)C"), + Molecule(smiles="CC[C]") ], ) - source = self.database.extract_source_from_comments(dipk_rad) - assert "Library" in source - assert "GAV" in source - assert len(source["GAV"]["radical"]) == 1 + CCC_rad_source = self.database.extract_source_from_comments(propane_rad) + assert "Library" in CCC_rad_source + assert "GAV" in CCC_rad_source + assert len(CCC_rad_source["GAV"]["radical"]) == 1 + + assert CCC_source["Library"][0] == CCC_rad_source["Library"][0], "The library source should be the same for the radical and non-radical species since they come from the same library entry." + assert CCC_source["Library"][1].item.is_isomorphic(CCC_rad_source["Library"][1].item), "The library source should be the same molecule for the radical and non-radical species since they come from the same library entry." + # Pure QM thermo cineole = Species( @@ -422,6 +376,7 @@ def test_parse_thermo_comments(self): source = self.database.extract_source_from_comments(cineole) assert "QM" in source + assert source["QM"][0] == "MopacMolPM3" # Mixed QM and HBI thermo cineole_rad = Species( @@ -465,6 +420,7 @@ def test_parse_thermo_comments(self): source = self.database.extract_source_from_comments(cineole_rad) assert "QM" in source + assert source["QM"][0] == "MopacMolPM3" assert "GAV" in source assert len(source["GAV"]["radical"]) == 1 @@ -604,9 +560,12 @@ def test_parse_thermo_comments(self): OX = rmgpy.species.Species(smiles="O=*") OX.thermo = rmgpy.thermo.NASA() OX.thermo.comment = 'Thermo library: surfaceThermoPt111' - source = self.database.extract_source_from_comments(OX) + # load the surfaceThermoPt111 library into databaseWithNoLibraries for this one example since it's not worth the ~7s it takes to make a separate test database + self.databaseWithoutLibraries.load_libraries(path=os.path.join(settings["database.directory"], "thermo", "libraries"), libraries=["surfaceThermoPt111"]) + source = self.databaseWithoutLibraries.extract_source_from_comments(OX) + self.databaseWithoutLibraries.unload_libraries('surfaceThermoPt111') # unload the library again so it doesn't interfere with other tests assert "Library" in source - assert source["Library"] == "surfaceThermoPt111" + assert source["Library"][0] == "surfaceThermoPt111" # Gas library + adsorption correction CH2X = rmgpy.species.Species(smiles="[CH2]=*") @@ -614,7 +573,7 @@ def test_parse_thermo_comments(self): CH2X.thermo.comment = 'Gas phase thermo for CH2(T) from Thermo library: primaryThermoLibrary. Adsorption correction: + Thermo group additivity estimation:\nadsorptionPt111(C=XR2)' source = self.database.extract_source_from_comments(CH2X) assert "Library" in source - assert source["Library"] == "primaryThermoLibrary" + assert source["Library"][0] == "primaryThermoLibrary" assert "ADS" in source assert source['ADS']['adsorptionPt111'][0][0].label == 'C=XR2' assert source['ADS']['adsorptionPt111'][0][1] == 1 # weight should be 1 @@ -637,10 +596,10 @@ def test_parse_thermo_comments(self): # Gas library + radical for HBI + adsorption correction CHOX = rmgpy.species.Species(smiles="O=[CH]*") CHOX.thermo = rmgpy.thermo.NASA() - CHOX.thermo.comment = 'Gas phase thermo for [CH]=O from Thermo library: primaryThermoLibrary + radical(HCdsJO). Adsorption correction: + Thermo group additivity estimation: adsorptionPt111(C-XR3)' + CHOX.thermo.comment = 'Gas phase thermo for [CH]=O from Thermo library: DFT_QCI_thermo + radical(HCdsJO). Adsorption correction: + Thermo group additivity estimation: adsorptionPt111(C-XR3)' source = self.database.extract_source_from_comments(CHOX) assert "Library" in source - assert source["Library"] == "primaryThermoLibrary" + assert source["Library"][0] == "DFT_QCI_thermo" assert "GAV" in source assert source["GAV"]["radical"][0][0].label == "HCdsJO" assert source["GAV"]["radical"][0][1] == 1 # weight should be 1 @@ -656,7 +615,7 @@ def test_parse_thermo_comments(self): OX.thermo.comment = OX_comment # set the comment to be the generated comment source = self.database.extract_source_from_comments(OX) assert "Library" in source - assert source["Library"] == "primaryThermoLibrary" + assert source["Library"][0] == "primaryThermoLibrary" assert 'ADS' in source assert source['ADS']['adsorptionPt111'][0][0].label == 'OX' assert source['ADS']['adsorptionPt111'][0][1] == 1 # weight should be 1 @@ -667,7 +626,7 @@ def test_parse_thermo_comments(self): CH2CHCH_ads.thermo = self.database.get_thermo_data(CH2CHCH_ads) source = self.database.extract_source_from_comments(CH2CHCH_ads) assert "Library" in source - assert source["Library"] == "DFT_QCI_thermo" + assert source["Library"][0] == "DFT_QCI_thermo" assert "GAV" in source assert source["GAV"]["radical"][0][0].label == "AllylJ2_triplet" assert source["GAV"]["radical"][0][1] == 1 # weight should be 1 From 210f585cc218662199fad23b31b5be3cbb58b71b Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Thu, 28 May 2026 10:52:22 -0400 Subject: [PATCH 4/6] return kinetic library entry when parsing comments Before this, the kinetics version of extract_source_from_comment returned the library name, but now it returns a tuple of the library name and the library entry. This makes things much easier downstream in the uncertainty tool. --- rmgpy/data/kinetics/database.py | 22 ++++++++++++++++++++-- test/rmgpy/data/kinetics/kineticsTest.py | 2 +- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/rmgpy/data/kinetics/database.py b/rmgpy/data/kinetics/database.py index e92b3df3c9..12f474fad5 100644 --- a/rmgpy/data/kinetics/database.py +++ b/rmgpy/data/kinetics/database.py @@ -695,7 +695,7 @@ def extract_source_from_comments(self, reaction): A reaction can only be estimated using one of these methods. source = {'RateRules': (Family_Label, OriginalTemplate, RateRules), - 'Library': String_Name_of_Library_Used, + 'Library': (String_Name_of_Library_Used, Library_Entry), 'PDep': Network_Index, 'Training': (Family_Label, Training_Reaction_Entry), } @@ -715,7 +715,25 @@ def extract_source_from_comments(self, reaction): source['Rate Rules'] = data_source elif isinstance(reaction, LibraryReaction): # This reaction comes from a reaction library or seed mechanism - source['Library'] = reaction.library + if reaction.library not in self.libraries: + raise ValueError(f'Library {reaction.library} not found in kinetics database. Make sure libraries match input file used to generate kinetics.') + library_reactions = self.generate_reactions_from_library( + library=self.libraries[reaction.library], + reactants=reaction.reactants, + products=reaction.products + ) + if not library_reactions: + raise ValueError(f'Could not find reaction {reaction} in library {reaction.library} when trying to extract source data from comments.') + entry = self.libraries[reaction.library].entries[library_reactions[0].entry.index] + if len(library_reactions) > 1: + # Library contains multiple reactions of the same type, so pick the one with matching kinetics + for rxn in library_reactions: + if rxn.entry.data.is_similar_to(reaction.kinetics): # units might have been changed, so use is_similar_to instead of direct comparison + entry = self.libraries[reaction.library].entries[rxn.entry.index] + break + else: + raise ValueError(f'Found multiple instances of reaction {reaction} in library {reaction.library}, but none of them have kinetics identical to the reaction kinetics.') + source['Library'] = (reaction.library, entry) elif isinstance(reaction, PDepReaction): # This reaction is a pressure-dependent reaction diff --git a/test/rmgpy/data/kinetics/kineticsTest.py b/test/rmgpy/data/kinetics/kineticsTest.py index 36f21e1085..2f89918b7e 100644 --- a/test/rmgpy/data/kinetics/kineticsTest.py +++ b/test/rmgpy/data/kinetics/kineticsTest.py @@ -672,7 +672,7 @@ def test_parse_kinetics(self): # ------------------------------------------------------------------- # # Source 0 comes from a kinetics library assert "Library" in sources[0] - assert sources[0]["Library"] == "GRI-Mech3.0" + assert sources[0]["Library"][0] == "GRI-Mech3.0" reconstructed_kinetics = self.database.kinetics.reconstruct_kinetics_from_source(reactions[0], sources[0], fix_barrier_height=True) A = reconstructed_kinetics.A.value_si From 004d62c9d64def32e8b26d98ed628f63f2907d39 Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Thu, 28 May 2026 13:46:50 -0400 Subject: [PATCH 5/6] update uncertainty tool to use library entry objects instead of names When the library sources only included the name of the library, the uncertainty code had to compensate by adding extra species that are not in the model. Now that we're using library entry objects we are able to remove the complexity of the uncertainty.extra_species object. --- rmgpy/tools/uncertainty.py | 237 ++++++++-------------------- test/rmgpy/tools/uncertaintyTest.py | 9 +- 2 files changed, 74 insertions(+), 172 deletions(-) diff --git a/rmgpy/tools/uncertainty.py b/rmgpy/tools/uncertainty.py index 965df1db6d..368a5eae40 100644 --- a/rmgpy/tools/uncertainty.py +++ b/rmgpy/tools/uncertainty.py @@ -69,9 +69,9 @@ def get_uncertainty_value(self, source): varG += self.dG_library * self.dG_library if 'Surface_Library' in source: surf_lib_varG = self.dG_surf_lib * self.dG_surf_lib - # covariance libraries should overrule the default uncertainties when available + # covariance libraries should overrule the default uncertainties when available, if self.other_covariances is not None: - label = f'Surface_Library {source["Surface_Library"]}' # match the covariance dict label format + label = source["Surface_Library"][2] # match the covariance dict label format cov_label = (label, label) if cov_label in self.other_covariances: surf_lib_varG = self.other_covariances[cov_label] @@ -92,26 +92,29 @@ def get_partial_uncertainty_value(self, source, corr_source_type, corr_param=Non """ Obtain the partial uncertainty dG/dG_corr*dG_corr, where dG_corr is the correlated parameter - `corr_param` is the parameter identifier itself, which is a integer for QM and library parameters, or a string for group values + `corr_param` is the parameter identifier itself, which is + (method_name, species) for QM + (library_name, library_entry, entry_label) for library parameters + or a string for group values `corr_source_type` is a string, being either 'Library', 'QM', 'GAV', 'ADS', or 'Estimation' `corr_group_type` is a string used only when the source type is 'GAV' and indicates grouptype """ if corr_source_type == 'Library': - if 'Library' in source: - if source['Library'] == corr_param: + if 'Library' in source: # check if same library and index match, could do isomorphism check on the entries, but this is faster + if source['Library'][0] == corr_param[0] and source['Library'][1].index == corr_param[1].index: # Correlated parameter is a source of the overall parameter return self.dG_library elif corr_source_type == 'Surface_Library': if 'Surface_Library' in source: - if source['Surface_Library'] == corr_param: + if source['Surface_Library'][0] == corr_param[0] and source['Surface_Library'][1].index == corr_param[1].index: # Correlated parameter is a source of the overall parameter return self.dG_surf_lib elif corr_source_type == 'QM': - if 'QM' in source: - if source['QM'] == corr_param: + if 'QM' in source: # here we do an isomorphism check since no index is saved for QM + if source['QM'][0] == corr_param[0] and source['QM'][1].is_isomorphic(corr_param[1]): # Correlated parameter is a source of the overall parameter return self.dG_QM @@ -439,9 +442,6 @@ def __init__(self, species_list=None, reaction_list=None, output_directory='', t self.thermo_covariance_groups = thermo_covariance_groups self.thermo_covariances_dict = {} # dictionary to store covariances from covariance libraries - # For extra species needed for correlated analysis but not in model - self.extra_species = [] - # Make output directory if it does not yet exist: if not os.path.exists(self.output_directory): try: @@ -500,33 +500,6 @@ def load_model(self, chemkin_path, dictionary_path, transport_path=None, surface transport_path=transport_path, surface_path=surface_path) - def retrieve_saturated_species_from_list(self, species): - """ - Given a radical `species`, this function retrieves the saturated species objects from a list of species objects - and returns the saturated species object along with a boolean that indicates if the species is not part of the model - (True->not in the model, False->in the model) - """ - - molecule = species.molecule[0] - assert molecule.is_radical(), "Method only valid for radicals." - saturated_struct = molecule.copy(deep=True) - saturated_struct.saturate_radicals() - for otherSpecies in self.species_list: - if otherSpecies.is_isomorphic(saturated_struct): - return otherSpecies, False - - # couldn't find saturated species in the model, try libraries - new_spc = Species(molecule=[saturated_struct]) - new_spc.generate_resonance_structures() - thermo = self.database.thermo.get_thermo_data_from_libraries(new_spc) - - if thermo is not None: - new_spc.thermo = thermo - self.species_list.append(new_spc) - return new_spc, True - else: - raise Exception('Could not retrieve saturated species form of {0} from the species list'.format(species)) - def load_thermo_covariances_from_libraries(self): """ This function populates the self.thermo_covariances_dict with covariance data (in units of (kcal/mol)^2) from the given covariance libraries @@ -537,7 +510,7 @@ def load_thermo_covariances_from_libraries(self): See the RMG-database/scripts/compile_BEEF_cov.ipynb Jupyter notebook for more details on how to generate these covariance libraries. - This function only adds covariance data for species that are actually in the model, (or in the extra_species as in the case of the radical/HBI correction) + This function only adds covariance data for species that are actually in the model, (which may still include a subset of library species that are not in the model as in the case of the radical/HBI correction) and only for the thermo source associated with that library. The goal is to keep the dictionary as small as possible because the lookups scale badly. Note: the covariance.npy matrix is in units of (kJ/mol)^2, but gets converted to (kcal/mol)^2 in this function to match the rest of the analysis @@ -596,7 +569,8 @@ def load_thermo_covariances_from_libraries(self): try: label = f'{surface_prefix}Library {self.species_list[i_sp].to_chemkin()}' except IndexError: - label = f'{surface_prefix}Library {self.extra_species[i_sp - len(self.species_list)].to_chemkin()}' + # uses InChI now + label = f'{surface_prefix}Library {self.species_list[i_sp].molecule[0].to_inchi()}' lib_species.label = label subset_indices.append(i_lib) @@ -626,7 +600,7 @@ def load_thermo_covariances_from_groups(self): See the RMG-database/scripts/compile_BEEF_cov.ipynb Jupyter notebook for more details on how to generate these covariance libraries. - This function only adds covariance data for groups and species that are actually in the model, (or in the extra_species as in the case of the radical/HBI correction) + This function only adds covariance data for groups and species that are actually in the model, (which may still include a subset of library entries for species not in the model as in the case of the radical/HBI correction) and only for the thermo source associated with that group/library. The goal is to keep the dictionary as small as possible because the lookups scale badly. Note: the covariance.npy matrix is in units of (kJ/mol)^2, but gets converted to (kcal/mol)^2 in this function to match the rest of the analysis @@ -752,7 +726,7 @@ def load_thermo_covariances_from_groups(self): try: label = f'{surface_prefix}Library {self.species_list[i_sp].to_chemkin()}' except IndexError: - label = f'{surface_prefix}Library {self.extra_species[i_sp - len(self.species_list)].to_chemkin()}' + label = f'{surface_prefix}Library {self.species_list[i_sp].molecule[0].to_inchi()}' lib_species.label = label subset_species_indices.append(i_lib) @@ -772,105 +746,52 @@ def extract_sources_from_model(self): Must be done after loading model and database to work. """ self.species_sources_dict = {} - self.extra_species = [] allowed_source_keys = {'Library', 'QM', 'GAV', 'ADS'} for species in self.species_list: - if species not in self.extra_species: - source = self.database.thermo.extract_source_from_comments(species) - unexpected_source_keys = set(source.keys()) - allowed_source_keys - if unexpected_source_keys: - raise ValueError( - f'Source of thermo must be either Library, QM, GAV, or ADS; ' - f'got unexpected source keys {unexpected_source_keys} for species {species.label}' - ) - - # Now prep the source data - # Do not alter the GAV information, but reassign QM and Library sources to the species indices that they came from - # Also specify the source as a Surface Library (if it has surface sites and comes from a library), for better differentiation when assigning uncertainties - if len(source) == 1: - # The thermo came from a single source, so we know it comes from a value describing the exact species - if 'Library' in source: - # Use just the species index in self.species_list, for better shorter printouts when debugging - source['Library'] = self.species_list.index(species) - if species.contains_surface_site(): - source['Surface_Library'] = source.pop('Library') - if 'QM' in source: - source['QM'] = self.species_list.index(species) - - elif len(source) == 2: - # The thermo has two sources, which indicates it's an HBI correction on top of a library or QM value... - # OR it is an adsorption correction with gas-phase thermo from Library/QM/GAV (no need to edit GAV source) - if 'ADS' in source: - # Need to retrieve the gas-phase molecule that the adsorption correction was applied to, and assign the source of the thermo to be that molecule instead of the surface species - if not species.contains_surface_site(): - raise ValueError('Species uses adsorption correction but does not contain any surface sites') - dummy_gas_species = Species() - dummy_gas_species.molecule = species.molecule[0].get_desorbed_molecules() - # add to species list if it's not already there, so we can reference it in the source dictionary - for spc in self.species_list: - if spc.is_isomorphic(dummy_gas_species): - dummy_gas_species = spc - break - else: - dummy_gas_species.thermo = self.database.thermo.get_thermo_data(dummy_gas_species) - self.species_list.append(dummy_gas_species) - self.extra_species.append(dummy_gas_species) - - if 'Library' in source: - # Use just the species index in self.species_list, for better shorter printouts when debugging - source['Library'] = self.species_list.index(dummy_gas_species) - if 'QM' in source: - source['QM'] = self.species_list.index(dummy_gas_species) - else: - # We must retrieve the original saturated molecule's thermo instead of using the radical species as the source of thermo - saturated_species, ignore_spc = self.retrieve_saturated_species_from_list(species) - - if ignore_spc: # this is saturated species that isn't in the actual model - self.extra_species.append(saturated_species) - - if 'Library' in source: - source['Library'] = self.species_list.index(saturated_species) - - if saturated_species.contains_surface_site(): - source['Surface_Library'] = source.pop('Library') # surface species library + radical correction - if 'QM' in source: - source['QM'] = self.species_list.index(saturated_species) - elif len(source) == 3: - # combination of adsorption correction, GAV (radical), and Library/ML - - if not species.contains_surface_site(): - raise ValueError( - f'Only surface species should have 3 thermo sources (adsorption correction, GAV, and library/QM); ' - f'got species={species.label}, source={source}' - ) - - # retrieve the desorbed version of the surface species-- the thing the adsorption correction was applied to during thermo estimation - dummy_gas_species = Species() - dummy_gas_species.molecule = species.molecule[0].get_desorbed_molecules() - saturated_species, ignore_spc = self.retrieve_saturated_species_from_list(dummy_gas_species) - - if ignore_spc: # this is saturated species that isn't in the actual model - self.extra_species.append(saturated_species) - - if 'Library' in source: - source['Library'] = self.species_list.index(saturated_species) - if 'QM' in source: - source['QM'] = self.species_list.index(saturated_species) + source = self.database.thermo.extract_source_from_comments(species) + unexpected_source_keys = set(source.keys()) - allowed_source_keys + if unexpected_source_keys: + raise ValueError( + f'Source of thermo must be either Library, QM, GAV, or ADS; ' + f'got unexpected source keys {unexpected_source_keys} for species {species.label}' + ) + + # Prep the source data: + # extend the library/QM sources to include species labels specific to the model + # Specify the source as a Surface Library (if it has surface sites and comes from a library), for better differentiation when assigning uncertainties + if 'Library' in source: + lib_species = rmgpy.species.Species(molecule=[source["Library"][1].item]) + label = self.get_species_label(lib_species) + if lib_species.contains_surface_site(): + extended_source = source['Library'] + (f'Surface_Library {label}',) + source['Surface_Library'] = extended_source + source.pop('Library') else: - raise Exception('Source of thermo should not use more than three sources out of ADS, QM, Library, or GAV.') + extended_source = source['Library'] + (f'Library {label}',) + source['Library'] = extended_source - self.species_sources_dict[species] = source + if 'QM' in source: + qm_species = source["QM"][1] + label = self.get_species_label(qm_species) + extended_source = source['QM'] + (f'QM {label}',) + source['QM'] = extended_source + + self.species_sources_dict[species] = source self.reaction_sources_dict = {} for reaction in self.reaction_list: source = self.database.kinetics.extract_source_from_comments(reaction) # Prep the source data # Consider any library or PDep reaction to be an independent parameter for now - # and assign the source to the index of the reaction within self.reaction_list if 'Library' in source: - source['Library'] = self.reaction_list.index(reaction) + # add a label to the end of the tuple with the official name to be used to match covariances if reaction.is_surface_reaction(): - source['Surface_Library'] = source.pop('Library') + extended_source = source['Library'] + (f'Surface_Library {reaction.to_chemkin(self.species_list, kinetics=False)}',) + source['Surface_Library'] = extended_source + source.pop('Library') + else: + extended_source = source['Library'] + (f'Library {reaction.to_chemkin(self.species_list, kinetics=False)}',) + source['Library'] = extended_source elif 'PDep' in source: source['PDep'] = self.reaction_list.index(reaction) elif 'Training' in source: @@ -882,9 +803,6 @@ def extract_sources_from_model(self): raise Exception('Source of kinetics must be either Library, PDep, Training, or Rate Rules') self.reaction_sources_dict[reaction] = source - for spc in self.extra_species: - self.species_list.remove(spc) - # -------------------- load covariance libraries ------------------------# self.load_thermo_covariances_from_libraries() self.load_thermo_covariances_from_groups() @@ -979,6 +897,13 @@ def compile_all_sources(self): for family_label in all_kinetic_sources['Training'].keys(): self.all_kinetic_sources['Training'][family_label] = list(all_kinetic_sources['Training'][family_label]) + def get_species_label(self, species): + i_sp = get_i_thing(species, self.species_list) + if i_sp >= 0: + return self.species_list[i_sp].to_chemkin() + else: + return species.to_chemkin() + def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=None, correlated=False): """ Assign uncertainties based on the sources of the species thermo and reaction kinetics. @@ -993,36 +918,22 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non for species in self.species_list: if not correlated: - entry = self.species_sources_dict[species] - if 'Surface_Library' in entry: # preconditioning for covariance - # this is an ugly workaround to handle covariances: because get_uncertainty_value needs the species chemkin string to get the covariance - # but the source dictionary only has the index of the surface library entry - entry_copy = entry.copy() - entry_copy['Surface_Library'] = self.species_list[entry_copy['Surface_Library']].to_chemkin() - dG = g_param_engine.get_uncertainty_value(entry_copy) - else: - dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) + dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) self.thermo_input_uncertainties.append(dG) else: source = self.species_sources_dict[species] dG = {} if 'Library' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'Library', corr_param=source['Library']) - try: - label = 'Library {}'.format(self.species_list[source['Library']].to_chemkin()) - except IndexError: - label = 'Library {}'.format(self.extra_species[source['Library'] - len(self.species_list)].to_chemkin()) + label = source['Library'][2] # the label we added to the end of the library source tuple in extract_sources_from_model dG[label] = pdG if 'Surface_Library' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'Surface_Library', corr_param=source['Surface_Library']) - try: - label = 'Surface_Library {}'.format(self.species_list[source['Surface_Library']].to_chemkin()) - except IndexError: - label = 'Surface_Library {}'.format(self.extra_species[source['Surface_Library'] - len(self.species_list)].to_chemkin()) + label = source['Surface_Library'][2] dG[label] = pdG if 'QM' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'QM', corr_param=source['QM']) - label = 'QM {}'.format(self.species_list[source['QM']].to_chemkin()) + label = source['QM'][2] dG[label] = pdG if 'ADS' in source: for adsGroupType, groupList in source['ADS'].items(): @@ -1089,12 +1000,12 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non elif 'Library' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Library', source['Library']) - label = 'Library {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + label = source['Library'][2] dlnk[label] = dplnk elif 'Surface_Library' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Surface_Library', source['Surface_Library']) - label = 'Surface_Library {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + label = source['Surface_Library'][2] dlnk[label] = dplnk elif 'Training' in source: @@ -1134,33 +1045,19 @@ def assign_intermediate_uncertainties(self, g_param_engine=None, k_param_engine= for species in self.species_list: if not correlated: - entry = self.species_sources_dict[species] - if 'Surface_Library' in entry: # preconditioning for covariance - # this is an ugly workaround to handle covariances: because get_uncertainty_value needs the species chemkin string to get the covariance - # but the source dictionary only has the index of the surface library entry - entry_copy = entry.copy() - entry_copy['Surface_Library'] = self.species_list[entry_copy['Surface_Library']].to_chemkin() - dG = g_param_engine.get_uncertainty_value(entry_copy) - else: - dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) + dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) self.thermo_intermediate_uncertainties.append(dG) # in the uncorrelated case, the intermediate is just the uncertainty value itself, since there is only one parameter that contributes to the uncertainty else: source = self.species_sources_dict[species] dGdq = {} if 'Library' in source: - try: - label = 'Library {}'.format(self.species_list[source['Library']].to_chemkin()) - except IndexError: - label = 'Library {}'.format(self.extra_species[source['Library'] - len(self.species_list)].to_chemkin()) + label = source['Library'][2] dGdq[label] = 1 # dG/dG_lib = 1, because the parameter is never scaled by anything other than 1 when it is used if 'Surface_Library' in source: - try: - label = 'Surface_Library {}'.format(self.species_list[source['Surface_Library']].to_chemkin()) - except IndexError: - label = 'Surface_Library {}'.format(self.extra_species[source['Surface_Library'] - len(self.species_list)].to_chemkin()) + label = source['Surface_Library'][2] dGdq[label] = 1 # dG/dG_surf = 1, because the parameter is never scaled by anything other than 1 when it is used if 'QM' in source: - label = 'QM {}'.format(self.species_list[source['QM']].to_chemkin()) + label = source['QM'][2] dGdq[label] = 1 if 'ADS' in source: for adsGroupType, groupList in source['ADS'].items(): diff --git a/test/rmgpy/tools/uncertaintyTest.py b/test/rmgpy/tools/uncertaintyTest.py index b3772ebecf..573239ce48 100644 --- a/test/rmgpy/tools/uncertaintyTest.py +++ b/test/rmgpy/tools/uncertaintyTest.py @@ -137,7 +137,12 @@ def test_uncertainty_assignment(self): assert grp == grp_expected assert rad == rad_expected assert other == other_expected - assert sorted(self.uncertainty.all_thermo_sources["Library"]) == [0, 1, 5, 13, 16] + + library_indices = [] + for source in self.uncertainty.all_thermo_sources["Library"]: + sp = rmgpy.species.Species(molecule=[source[1].item]) + library_indices.append(rmgpy.tools.uncertainty.get_i_thing(sp, self.uncertainty.species_list)) + assert sorted(library_indices) == [0, 1, 5, 13, 16] assert not self.uncertainty.all_thermo_sources["QM"] # Check kinetics sources @@ -160,7 +165,7 @@ def test_uncertainty_assignment(self): rr = set([e.label for e in self.uncertainty.all_kinetic_sources["Rate Rules"]["H_Abstraction"]]) assert rr == H_Abstraction_rr_expected assert set(self.uncertainty.all_kinetic_sources["Training"].keys()) == {"Disproportionation", "H_Abstraction"} - assert self.uncertainty.all_kinetic_sources["Library"] == [0] + assert self.uncertainty.all_kinetic_sources["Library"][0][1].item.is_isomorphic(self.uncertainty.reaction_list[0]) assert self.uncertainty.all_kinetic_sources["PDep"] == [6] # Step 3: assign and propagate uncertainties From 0ecb7e114f32d4bbb7d739970c8201ca1e95724e Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Fri, 29 May 2026 18:33:21 -0400 Subject: [PATCH 6/6] Merge assign_parameter_uncertainties and intermediate_uncertainties There's a lot of repeated code between the old UQ's assign_parameter_uncertainties and the new UQ's assign_intermediate_uncertainties, so this combines them into a single function in order to reuse a lot of that repeated code and make it easier to keep them consistent. --- rmgpy/tools/uncertainty.py | 220 ++++++++-------------------- test/rmgpy/tools/uncertaintyTest.py | 72 ++++----- 2 files changed, 98 insertions(+), 194 deletions(-) diff --git a/rmgpy/tools/uncertainty.py b/rmgpy/tools/uncertainty.py index 368a5eae40..f470b9a4d9 100644 --- a/rmgpy/tools/uncertainty.py +++ b/rmgpy/tools/uncertainty.py @@ -427,10 +427,10 @@ def __init__(self, species_list=None, reaction_list=None, output_directory='', t self.reaction_sources_dict = None self.all_thermo_sources = None self.all_kinetic_sources = None - self.thermo_input_uncertainties = None # previous formulation thermo parameter uncertainties - self.kinetic_input_uncertainties = None # previous formulation kinetic parameter uncertainties - self.thermo_intermediate_uncertainties = None # new formulation thermo parameter uncertainties - can be dependent on each other - self.kinetic_intermediate_uncertainties = None # new formulation kinetic parameter uncertainties - can be dependent on each other + self.thermo_input_uncertainties = None # thermo parameter uncertainties + self.kinetic_input_uncertainties = None # kinetic parameter uncertainties + self.dG_dqs = None # derivatives of Gibbs free energy with respect to underlying thermo parameters + self.dlnk_dqs = None # derivatives of ln(k) with respect to underlying kinetic parameters self.thermo_covariance_matrix = None # covariance matrix of all species thermo uncertainties self.kinetic_covariance_matrix = None # covariance matrix of all reaction kinetic uncertainties self.Sigma_ww_thermo = None # covariance matrix of all underlying thermo parameter uncertainties @@ -916,6 +916,9 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non self.thermo_input_uncertainties = [] self.kinetic_input_uncertainties = [] + self.dG_dqs = [] # a list of dictionaries to store the intermediate derivatives dG_i/dq for each parameter q that contributes to the uncertainty of species i's G + self.dlnk_dqs = [] # a list of dictionaries to store the intermediate derivatives dlnk_i/dq for each parameter q that contributes to the uncertainty of reaction i's k + for species in self.species_list: if not correlated: dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) @@ -923,36 +926,46 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non else: source = self.species_sources_dict[species] dG = {} + dG_dq = {} if 'Library' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'Library', corr_param=source['Library']) label = source['Library'][2] # the label we added to the end of the library source tuple in extract_sources_from_model dG[label] = pdG + dG_dq[label] = 1 # derivative of G with respect to the library parameter is 1, since it's a direct contribution if 'Surface_Library' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'Surface_Library', corr_param=source['Surface_Library']) label = source['Surface_Library'][2] dG[label] = pdG + dG_dq[label] = 1 # derivative of G with respect to the surface library parameter is 1, since it's a direct contribution if 'QM' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'QM', corr_param=source['QM']) label = source['QM'][2] dG[label] = pdG + dG_dq[label] = 1 # derivative of G with respect to the QM parameter is 1, since it's a direct contribution if 'ADS' in source: for adsGroupType, groupList in source['ADS'].items(): for group, weight in groupList: pdG = g_param_engine.get_partial_uncertainty_value(source, 'ADS', group, adsGroupType) label = 'AdsorptionCorrection({}) {}'.format(adsGroupType, group.label) dG[label] = pdG + if weight != 1: + raise ValueError('Weight for adsorption group contribution to thermo should be 1, but got weight={weight} for {adsGroupType} in species {species}'.format(weight=weight, adsGroupType=adsGroupType, species=species)) + dG_dq[label] = weight # This should be 1 because there's only one group contribution per adsorption correction if 'GAV' in source: for groupType, groupList in source['GAV'].items(): for group, weight in groupList: pdG = g_param_engine.get_partial_uncertainty_value(source, 'GAV', group, groupType) label = 'Group({}) {}'.format(groupType, group.label) dG[label] = pdG + dG_dq[label] = weight # the derivative of G with respect to the group contribution is the weight of that group contribution to the overall G # We also know if there is group additivity used, there will be uncorrelated estimation error est_pdG = g_param_engine.get_partial_uncertainty_value(source, 'Estimation') if est_pdG: label = 'Estimation {}'.format(species.to_chemkin()) dG[label] = est_pdG + dG_dq[label] = 1 # the derivative of G with respect to the estimation error is 1, since we add the term directly self.thermo_input_uncertainties.append(dG) + self.dG_dqs.append(dG_dq) for reaction in self.reaction_list: if not correlated: @@ -961,6 +974,7 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non else: source = self.reaction_sources_dict[reaction] dlnk = {} + dlnk_dq = {} if 'Rate Rules' in source: family = source['Rate Rules'][0] source_dict = source['Rate Rules'][1] @@ -975,166 +989,55 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non corr_family=family) label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) dlnk[label] = dplnk - + dlnk_dq[label] = weight # the derivative of ln(k) with respect to the rate rule contribution for ruleEntry, trainingEntry, weight in training: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Rate Rules', corr_param=ruleEntry, corr_family=family) label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) dlnk[label] = dplnk + dlnk_dq[label] = weight # the derivative of ln(k) with respect to the training rule contribution + + N = len(source_dict['rules']) + len(source_dict['training']) # There is also estimation error if rate rules are used (nonexact and family contribute to this) nonexact_dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Estimation Nonexact', corr_family=family) if nonexact_dplnk: label = 'Estimation Nonexact {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = nonexact_dplnk + dlnk_dq[label] = np.log10(N + 1) # the derivative of ln(k) with respect to the nonexact estimation error family_dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Estimation Family', corr_family=family) if family_dplnk: label = 'Estimation Family {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = family_dplnk - + dlnk_dq[label] = 1 # the derivative of ln(k) with respect to the family estimation error elif 'PDep' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'PDep', source['PDep']) label = 'PDep {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = dplnk + dlnk_dq[label] = 1 elif 'Library' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Library', source['Library']) label = source['Library'][2] dlnk[label] = dplnk + dlnk_dq[label] = 1 elif 'Surface_Library' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Surface_Library', source['Surface_Library']) label = source['Surface_Library'][2] dlnk[label] = dplnk + dlnk_dq[label] = 1 elif 'Training' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Training', source['Training']) family = source['Training'][0] label = 'Training {} {}'.format(family, reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = dplnk + dlnk_dq[label] = 1 self.kinetic_input_uncertainties.append(dlnk) - - def assign_intermediate_uncertainties(self, g_param_engine=None, k_param_engine=None, correlated=False): - """ - Assign uncertainties to the intermediate parameters based on the sources of the species thermo and reaction kinetics. - - This fills out the class variables thermo_intermediate_uncertainties and kinetic_intermediate_uncertainties - these are each list of dictionaries. For every species or reaction, it lists all the intermediate sources contributing to that parameter's uncertainty. - - So for example, thermo_intermediate_uncertainties might look something like this: - - thermo_intermediate_uncertainties = [ - {'Group(group) Cds-CdsHH': 2.0, 'Group(radical) CCJ': 1.0, 'Estimation CH(4)': 1.0}, - {'Library CH2(5)': 1.0}, - ] - The keys of the dictionaries are the label names for the intermediate parameters. - and the values are partial derivatives dG_i/dq_w, how the species i Gibbs uncertainty changes with the intermediate parameter w. - - This function is the new formulation's equivalent to assign_parameter_uncertainties and similarly handles both correlated and uncorrelated cases. - But instead of assuming all underlying parameters are independent, here we can allow for dependence as long as we have the covariance - """ - if g_param_engine is None: - g_param_engine = ThermoParameterUncertainty(other_covariances=self.thermo_covariances_dict) - if k_param_engine is None: - k_param_engine = KineticParameterUncertainty() - - self.thermo_intermediate_uncertainties = [] # store the intermediate dG_i/dq for each parameter q that contributes to the uncertainty of G_i, for use in correlated uncertainty analysis - self.kinetic_intermediate_uncertainties = [] - - for species in self.species_list: - if not correlated: - dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) - self.thermo_intermediate_uncertainties.append(dG) # in the uncorrelated case, the intermediate is just the uncertainty value itself, since there is only one parameter that contributes to the uncertainty - else: - source = self.species_sources_dict[species] - dGdq = {} - if 'Library' in source: - label = source['Library'][2] - dGdq[label] = 1 # dG/dG_lib = 1, because the parameter is never scaled by anything other than 1 when it is used - if 'Surface_Library' in source: - label = source['Surface_Library'][2] - dGdq[label] = 1 # dG/dG_surf = 1, because the parameter is never scaled by anything other than 1 when it is used - if 'QM' in source: - label = source['QM'][2] - dGdq[label] = 1 - if 'ADS' in source: - for adsGroupType, groupList in source['ADS'].items(): - for group, weight in groupList: - label = 'AdsorptionCorrection({}) {}'.format(adsGroupType, group.label) - if weight != 1: - raise ValueError('Weight for adsorption group contribution to thermo should be 1, but got weight={weight} for {adsGroupType} in species {species}'.format(weight=weight, adsGroupType=adsGroupType, species=species)) - dGdq[label] = weight # This should be 1 - if 'GAV' in source: - for groupType, groupList in source['GAV'].items(): - for group, weight in groupList: - label = 'Group({}) {}'.format(groupType, group.label) - dGdq[label] = weight # dG/dG_group = weight, because the group contribution is scaled by the weight when it is used in the thermo estimation - # We also know if there is group additivity used, there will be uncorrelated estimation error - label = 'Estimation {}'.format(species.to_chemkin()) - dGdq[label] = 1 # dG/dG_est = 1, because the estimation error is added on top of the group additivity value, so it is never scaled by anything other than 1 when it is used - - self.thermo_intermediate_uncertainties.append(dGdq) - - for reaction in self.reaction_list: - if not correlated: - dlnk = k_param_engine.get_uncertainty_value(self.reaction_sources_dict[reaction]) - self.kinetic_intermediate_uncertainties.append(dlnk) # in the uncorrelated case, the intermediate is just the uncertainty value itself, since there is only one parameter that contributes to the uncertainty - else: - source = self.reaction_sources_dict[reaction] - dlnkdq = {} - if 'Rate Rules' in source: - family = source['Rate Rules'][0] - source_dict = source['Rate Rules'][1] - rules = source_dict['rules'] - training = source_dict['training'] - exact = source_dict['exact'] - surface_prefix = '' - if reaction.is_surface_reaction(): - surface_prefix = 'Surface ' - for ruleEntry, weight in rules: - label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) - dlnkdq[label] = weight # dlnk/dlnk_rule = weight, because the rate rule is scaled by the weight when it is used in the kinetics estimation - - for ruleEntry, trainingEntry, weight in training: - # TODO - test that training reactions in a tree are correlated with the exact match kind of training reaction - # for now, we follow the old convention of treating these as rate rules - label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) # ruleEntry should probably be the reaction equation itself - dlnkdq[label] = weight # dlnk/dlnk_training = weight, because the training entry is scaled by the weight when it is used in the kinetics estimation - - # There is also estimation error if rate rules are used - # Record dlnk/dlnk_family, the derivative with respect to the family estimation uncertainty - label = 'Estimation Family {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = 1 # dlnk/dlnk_family = 1, because the family estimation uncertainty is added on top of the rate rule values, so it is never scaled by anything other than 1 when it is used - - # Record the non-exact estimation error if not an exact match for a rate rule - if not exact: - N = len(source_dict['rules']) + len(source_dict['training']) - label = 'Estimation Nonexact {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = np.log10(N + 1) - - elif 'PDep' in source: - label = 'PDep {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = 1.0 # dlnk/dlnk_PDep = 1, because the PDep kinetics is never scaled by anything other than 1 when it is used - - elif 'Library' in source: - label = 'Library {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = 1.0 # dlnk/dlnk_lib = 1, because the library kinetics is never scaled by anything other than 1 when it is used - - elif 'Surface_Library' in source: - label = 'Surface_Library {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = 1.0 # dlnk/dlnk_surf_lib = 1, because the surface library kinetics is never scaled by anything other than 1 when it is used - - elif 'Training' in source: - family = source['Training'][0] - surface_prefix = '' - if reaction.is_surface_reaction(): - surface_prefix = 'Surface ' - label = '{}Training {} {}'.format(surface_prefix, family, reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = 1.0 - - self.kinetic_intermediate_uncertainties.append(dlnkdq) + self.dlnk_dqs.append(dlnk_dq) def sensitivity_analysis(self, initial_mole_fractions, sensitive_species, T, P, termination_time, sensitivity_threshold=1e-3, number=10, fileformat='.png', initial_surface_coverages=None, @@ -1434,14 +1337,14 @@ def get_thermo_covariance_matrix(self, g_param_engine=None): NxN square matrix where N is the number of species in the model, with the covariance between species i and j in the ith row and jth column. Units are in (kcal/mol)^2. - Must call assign_intermediate_uncertainties first to populate the source dictionaries. + Must call assign_parameter_uncertainties first to populate the source dictionaries. TODO speed this up with sparse matrix multiplication? """ - assert self.thermo_intermediate_uncertainties is not None, 'Must call assign_intermediate_uncertainties first' - assert len(self.thermo_intermediate_uncertainties) > 0, 'No thermodynamic parameters found' - if isinstance(self.thermo_intermediate_uncertainties[0], np.float64): - self.thermo_covariance_matrix = np.float_power(np.diag(self.thermo_intermediate_uncertainties), 2.0) + assert self.thermo_input_uncertainties is not None, 'Must call assign_parameter_uncertainties first' + assert len(self.thermo_input_uncertainties) > 0, 'No thermodynamic parameters found' + if isinstance(self.thermo_input_uncertainties[0], np.float64): # uncorrelated case + self.thermo_covariance_matrix = np.float_power(np.diag(self.thermo_input_uncertainties), 2.0) return self.thermo_covariance_matrix self.thermo_covariance_matrix = np.zeros((len(self.species_list), len(self.species_list))) @@ -1451,10 +1354,10 @@ def get_thermo_covariance_matrix(self, g_param_engine=None): for i in range(len(self.species_list)): for j in range((len(self.species_list))): - for q in self.thermo_intermediate_uncertainties[i].keys(): - dG_i_dq = self.thermo_intermediate_uncertainties[i][q] - for r in self.thermo_intermediate_uncertainties[j].keys(): - dG_j_dr = self.thermo_intermediate_uncertainties[j][r] + for q in self.dG_dqs[i].keys(): + dG_i_dq = self.dG_dqs[i][q] + for r in self.dG_dqs[j].keys(): + dG_j_dr = self.dG_dqs[j][r] self.thermo_covariance_matrix[i, j] += dG_i_dq * g_param_engine._get_covariance_qq(q, r) * dG_j_dr return self.thermo_covariance_matrix @@ -1465,14 +1368,14 @@ def get_kinetic_covariance_matrix(self, k_param_engine=None): MxM square matrix where M is the number of reactions in the model, with the covariance between reaction i and j in the ith row and jth column. Units are in (ln(k))^2. - Must call assign_intermediate_uncertainties first to populate the source dictionaries. + Must call assign_parameter_uncertainties first to populate the source dictionaries. TODO speed this up with sparse matrix multiplication? """ - assert self.kinetic_intermediate_uncertainties is not None, 'Must call assign_intermediate_uncertainties first' - assert len(self.kinetic_intermediate_uncertainties) > 0, 'No kinetic parameters found' - if isinstance(self.kinetic_intermediate_uncertainties[0], np.float64): - self.kinetic_covariance_matrix = np.float_power(np.diag(self.kinetic_intermediate_uncertainties), 2.0) + assert self.kinetic_input_uncertainties is not None, 'Must call assign_parameter_uncertainties first' + assert len(self.kinetic_input_uncertainties) > 0, 'No kinetic parameters found' + if isinstance(self.kinetic_input_uncertainties[0], np.float64): # uncorrelated case + self.kinetic_covariance_matrix = np.float_power(np.diag(self.kinetic_input_uncertainties), 2.0) return self.kinetic_covariance_matrix if k_param_engine is None: @@ -1482,10 +1385,10 @@ def get_kinetic_covariance_matrix(self, k_param_engine=None): for i in range(len(self.reaction_list)): for j in range(len(self.reaction_list)): - for q in self.kinetic_intermediate_uncertainties[i].keys(): - dlnk_i_dq = self.kinetic_intermediate_uncertainties[i][q] - for r in self.kinetic_intermediate_uncertainties[j].keys(): - dlnk_j_dr = self.kinetic_intermediate_uncertainties[j][r] + for q in self.dlnk_dqs[i].keys(): + dlnk_i_dq = self.dlnk_dqs[i][q] + for r in self.dlnk_dqs[j].keys(): + dlnk_j_dr = self.dlnk_dqs[j][r] self.kinetic_covariance_matrix[i, j] += dlnk_i_dq * k_param_engine._get_covariance_qq(q, r) * dlnk_j_dr return self.kinetic_covariance_matrix @@ -1494,7 +1397,7 @@ def _get_intermediate_thermo_covariance_matrix(self, g_param_engine=None, subset """ Make an explicit covariance matrix of all the qs (intermediate thermo parameters, like specific groups or library entries) - Requires calling assign_intermediate_uncertainties first + Requires calling assign_parameter_uncertainties first if subset_indices is None, computes the full matrix. Otherwise, only computes the matrices relevant to the species indicated by subset_indices @@ -1502,8 +1405,8 @@ def _get_intermediate_thermo_covariance_matrix(self, g_param_engine=None, subset if subset_indices is None: subset_indices = np.arange(len(self.species_list)) - if isinstance(self.thermo_intermediate_uncertainties[0], np.float64): - self.Sigma_ww_thermo = np.diag(np.float_power([self.thermo_intermediate_uncertainties[i] for i in subset_indices], 2.0)) + if isinstance(self.thermo_input_uncertainties[0], np.float64): # uncorrelated case + self.Sigma_ww_thermo = np.diag(np.float_power([self.thermo_input_uncertainties[i] for i in subset_indices], 2.0)) return self.Sigma_ww_thermo if g_param_engine is None: @@ -1511,7 +1414,7 @@ def _get_intermediate_thermo_covariance_matrix(self, g_param_engine=None, subset self.all_thermo_intermediates = set() for sp_idx in subset_indices: - for q in self.thermo_intermediate_uncertainties[sp_idx].keys(): + for q in self.dG_dqs[sp_idx].keys(): self.all_thermo_intermediates.add(q) self.all_thermo_intermediates = list(self.all_thermo_intermediates) W = len(self.all_thermo_intermediates) @@ -1529,7 +1432,7 @@ def _get_intermediate_kinetics_covariance_matrix(self, k_param_engine=None, subs """ Make an explicit covariance matrix of all the qs (intermediate kinetic parameters, like specific rate rules or libraries entries) - Requires calling assign_intermediate_uncertainties first + Requires calling assign_parameter_uncertainties first if subset_indices is None, computes the full matrix. Otherwise, only computes the matrices relevant to the reactions indicated by subset_indices @@ -1537,9 +1440,8 @@ def _get_intermediate_kinetics_covariance_matrix(self, k_param_engine=None, subs if subset_indices is None: subset_indices = np.arange(len(self.reaction_list)) - if isinstance(self.kinetic_intermediate_uncertainties[0], np.float64): - # TODO this might have to be squared - self.Sigma_ww_kinetics = np.diag(np.float_power([self.kinetic_intermediate_uncertainties[i] for i in subset_indices], 2.0)) + if isinstance(self.kinetic_input_uncertainties[0], np.float64): # uncorrelated case + self.Sigma_ww_kinetics = np.diag(np.float_power([self.kinetic_input_uncertainties[i] for i in subset_indices], 2.0)) return self.Sigma_ww_kinetics if k_param_engine is None: @@ -1547,7 +1449,7 @@ def _get_intermediate_kinetics_covariance_matrix(self, k_param_engine=None, subs self.all_kinetics_intermediates = set() for rxn_idx in subset_indices: - for q in self.kinetic_intermediate_uncertainties[rxn_idx].keys(): + for q in self.dlnk_dqs[rxn_idx].keys(): self.all_kinetics_intermediates.add(q) self.all_kinetics_intermediates = list(self.all_kinetics_intermediates) W = len(self.all_kinetics_intermediates) @@ -1578,15 +1480,15 @@ def _get_dG_dq_matrix(self, subset_indices=None): subset_indices = np.arange(len(self.species_list)) # return a square identity matrix if uncorrelated - if isinstance(self.thermo_intermediate_uncertainties[0], np.float64): + if isinstance(self.thermo_input_uncertainties[0], np.float64): # uncorrelated case return np.eye(len(subset_indices)) dGdq = np.zeros((len(subset_indices), len(self.all_thermo_intermediates))) for i, sp_idx in enumerate(subset_indices): - for key in self.thermo_intermediate_uncertainties[sp_idx].keys(): + for key in self.dG_dqs[sp_idx].keys(): q_index = self.all_thermo_intermediates.index(key) - dGdq[i, q_index] = self.thermo_intermediate_uncertainties[sp_idx][key] + dGdq[i, q_index] = self.dG_dqs[sp_idx][key] return dGdq @@ -1603,15 +1505,15 @@ def _get_dlnk_dq_matrix(self, subset_indices=None): subset_indices = np.arange(len(self.reaction_list)) # return a square identity matrix if uncorrelated - if isinstance(self.kinetic_intermediate_uncertainties[0], np.float64): + if isinstance(self.kinetic_input_uncertainties[0], np.float64): # uncorrelated case return np.eye(len(subset_indices)) dlnkdq = np.zeros((len(subset_indices), len(self.all_kinetics_intermediates))) for i, rxn_idx in enumerate(subset_indices): - for key in self.kinetic_intermediate_uncertainties[rxn_idx].keys(): + for key in self.dlnk_dqs[rxn_idx].keys(): q_index = self.all_kinetics_intermediates.index(key) - dlnkdq[i, q_index] = self.kinetic_intermediate_uncertainties[rxn_idx][key] + dlnkdq[i, q_index] = self.dlnk_dqs[rxn_idx][key] return dlnkdq diff --git a/test/rmgpy/tools/uncertaintyTest.py b/test/rmgpy/tools/uncertaintyTest.py index 573239ce48..51eb7953ef 100644 --- a/test/rmgpy/tools/uncertaintyTest.py +++ b/test/rmgpy/tools/uncertaintyTest.py @@ -179,62 +179,64 @@ def test_uncertainty_assignment(self): np.testing.assert_allclose(thermo_unc, expected_uncorrelated_thermo_uncertainties, rtol=1e-4) np.testing.assert_allclose(kinetic_unc, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4) - # ---------------------------- Now repeat for assign_intermediate_uncertainties ----------------------------- + # ---------------------------- Now repeat for new formulation ----------------------------- # uncorrelated - self.uncertainty.assign_intermediate_uncertainties(correlated=False) - intermediate_thermo_unc = self.uncertainty.thermo_intermediate_uncertainties - intermediate_kinetic_unc = self.uncertainty.kinetic_intermediate_uncertainties - np.testing.assert_allclose(intermediate_thermo_unc, expected_uncorrelated_thermo_uncertainties, rtol=1e-4) - np.testing.assert_allclose(intermediate_kinetic_unc, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4) + self.uncertainty.assign_parameter_uncertainties(correlated=False) + # use covariance matrices as test + thermo_covariance = np.sqrt(self.uncertainty.get_thermo_covariance_matrix().diagonal()) + kinetic_covariance = np.sqrt(self.uncertainty.get_kinetic_covariance_matrix().diagonal()) + assert np.isclose(thermo_covariance, expected_uncorrelated_thermo_uncertainties, rtol=1e-4).all() + assert np.isclose(kinetic_covariance, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4).all() + # correlated - self.uncertainty.assign_intermediate_uncertainties(correlated=True) + self.uncertainty.assign_parameter_uncertainties(correlated=True) # do a spot check on some of the intermediates (dG/dq) these are derivatives, not uncertainties # Thermo library example - assert self.uncertainty.thermo_intermediate_uncertainties[0].keys() == {'Library O(0)'} - assert self.uncertainty.thermo_intermediate_uncertainties[0]['Library O(0)'] == 1 + assert self.uncertainty.dG_dqs[0].keys() == {'Library O(0)'} + assert self.uncertainty.dG_dqs[0]['Library O(0)'] == 1 # Thermo GAV example - assert tuple(sorted(self.uncertainty.thermo_intermediate_uncertainties[2].keys())) == ('Estimation HO2(2)', 'Group(group) O2s-OsH', 'Group(other) R', 'Group(radical) HOOJ') - assert self.uncertainty.thermo_intermediate_uncertainties[2]['Estimation HO2(2)'] == 1 - assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(group) O2s-OsH'] == 2 - assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(other) R'] == 2 - assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(radical) HOOJ'] == 1 + assert tuple(sorted(self.uncertainty.dG_dqs[2].keys())) == ('Estimation HO2(2)', 'Group(group) O2s-OsH', 'Group(other) R', 'Group(radical) HOOJ') + assert self.uncertainty.dG_dqs[2]['Estimation HO2(2)'] == 1 + assert self.uncertainty.dG_dqs[2]['Group(group) O2s-OsH'] == 2 + assert self.uncertainty.dG_dqs[2]['Group(other) R'] == 2 + assert self.uncertainty.dG_dqs[2]['Group(radical) HOOJ'] == 1 # Thermo library + GAV - assert tuple(sorted(self.uncertainty.thermo_intermediate_uncertainties[14].keys())) == ('Estimation CH3(14)', 'Group(radical) CH3', 'Library CH4(16)') - assert self.uncertainty.thermo_intermediate_uncertainties[14]['Estimation CH3(14)'] == 1 - assert self.uncertainty.thermo_intermediate_uncertainties[14]['Group(radical) CH3'] == 1 - assert self.uncertainty.thermo_intermediate_uncertainties[14]['Library CH4(16)'] == 1 + assert tuple(sorted(self.uncertainty.dG_dqs[14].keys())) == ('Estimation CH3(14)', 'Group(radical) CH3', 'Library CH4(16)') + assert self.uncertainty.dG_dqs[14]['Estimation CH3(14)'] == 1 + assert self.uncertainty.dG_dqs[14]['Group(radical) CH3'] == 1 + assert self.uncertainty.dG_dqs[14]['Library CH4(16)'] == 1 # Kinetics library - assert self.uncertainty.kinetic_intermediate_uncertainties[0].keys() == {'Library O(0)+H2O2(3)<=>OH(1)+HO2(2)'} + assert self.uncertainty.dlnk_dqs[0].keys() == {'Library O(0)+H2O2(3)<=>OH(1)+HO2(2)'} # Rate rule (exact) - assert tuple(sorted(self.uncertainty.kinetic_intermediate_uncertainties[1].keys())) == ('Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)', 'Rate Rule H_Abstraction C/H3/Cs;C_methyl') - assert self.uncertainty.kinetic_intermediate_uncertainties[1]['Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)'] == 1 - assert self.uncertainty.kinetic_intermediate_uncertainties[1]['Rate Rule H_Abstraction C/H3/Cs;C_methyl'] == 1 + assert tuple(sorted(self.uncertainty.dlnk_dqs[1].keys())) == ('Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)', 'Rate Rule H_Abstraction C/H3/Cs;C_methyl') + assert self.uncertainty.dlnk_dqs[1]['Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)'] == 1 + assert self.uncertainty.dlnk_dqs[1]['Rate Rule H_Abstraction C/H3/Cs;C_methyl'] == 1 # Rate rule (non-exact, multiple rule weights) - assert tuple(sorted(self.uncertainty.kinetic_intermediate_uncertainties[3].keys())) == ( + assert tuple(sorted(self.uncertainty.dlnk_dqs[3].keys())) == ( 'Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', 'Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', 'Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs', 'Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad', ) - assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'] == 1 - assert np.isclose(self.uncertainty.kinetic_intermediate_uncertainties[3]['Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'], 0.4771212547, rtol=1e-4) - assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs'] == 0.5 - assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad'] == 0.5 + assert self.uncertainty.dlnk_dqs[3]['Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'] == 1 + assert np.isclose(self.uncertainty.dlnk_dqs[3]['Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'], 0.4771212547, rtol=1e-4) + assert self.uncertainty.dlnk_dqs[3]['Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs'] == 0.5 + assert self.uncertainty.dlnk_dqs[3]['Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad'] == 0.5 # Training reaction - assert self.uncertainty.kinetic_intermediate_uncertainties[5].keys() == {'Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'} - assert self.uncertainty.kinetic_intermediate_uncertainties[5]['Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'] == 1 + assert self.uncertainty.dlnk_dqs[5].keys() == {'Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'} + assert self.uncertainty.dlnk_dqs[5]['Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'] == 1 # PDEP - assert self.uncertainty.kinetic_intermediate_uncertainties[6].keys() == {'PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'} - assert self.uncertainty.kinetic_intermediate_uncertainties[6]['PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'] == 1 + assert self.uncertainty.dlnk_dqs[6].keys() == {'PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'} + assert self.uncertainty.dlnk_dqs[6]['PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'] == 1 # correlated uncertainties should match uncorrelated, so check diagonal of covariance matrix thermo_covariance = np.sqrt(self.uncertainty.get_thermo_covariance_matrix().diagonal()) @@ -410,7 +412,7 @@ def test_local_analysis(self): # -------------------- repeat the exact same test for new formulation -------------------------- # uncorrelated analysis first - self.uncertainty.assign_intermediate_uncertainties() + self.uncertainty.assign_parameter_uncertainties(correlated=False) output = self.uncertainty.local_analysis_intermediate(sensitive_species=sensitive_species) total_variance, kinetic_uncertainty, thermo_uncertainty = output[sensitive_species[0]] assert np.isclose(total_variance, expected_uncorrelated_total_variance) @@ -431,7 +433,7 @@ def test_local_analysis(self): assert sorted_thermo_names == expected_uncorrelated_thermo_labels # now repeat for correlated analysis - self.uncertainty.assign_intermediate_uncertainties(correlated=True) + self.uncertainty.assign_parameter_uncertainties(correlated=True) output = self.uncertainty.local_analysis_intermediate(sensitive_species=sensitive_species, correlated=True) total_variance, kinetic_uncertainty, thermo_uncertainty = output[sensitive_species[0]] assert np.isclose(total_variance, expected_correlated_total_variance) @@ -467,11 +469,11 @@ def test_covariance_matrices(self): uncorrelated_thermo_inputs = np.array(self.uncertainty.thermo_input_uncertainties) uncorrelated_kinetic_inputs = np.array(self.uncertainty.kinetic_input_uncertainties) - self.uncertainty.assign_intermediate_uncertainties(correlated=False) + self.uncertainty.assign_parameter_uncertainties(correlated=False) uncorrelated_thermo_covariance = self.uncertainty.get_thermo_covariance_matrix() uncorrelated_kinetic_covariance = self.uncertainty.get_kinetic_covariance_matrix() - self.uncertainty.assign_intermediate_uncertainties(correlated=True) + self.uncertainty.assign_parameter_uncertainties(correlated=True) correlated_thermo_covariance = self.uncertainty.get_thermo_covariance_matrix() correlated_kinetic_covariance = self.uncertainty.get_kinetic_covariance_matrix() Sigma_ww_thermo = self.uncertainty._get_intermediate_thermo_covariance_matrix()