diff --git a/osf/management/commands/migrate_osfmetrics_6to8.py b/osf/management/commands/migrate_osfmetrics_6to8.py index b1081d55ddf..49396d36ba3 100644 --- a/osf/management/commands/migrate_osfmetrics_6to8.py +++ b/osf/management/commands/migrate_osfmetrics_6to8.py @@ -1,6 +1,8 @@ import collections import datetime import functools +import heapq +import itertools import logging from django.apps import apps @@ -28,10 +30,7 @@ ) from osf.metrics import reports as es6_reports from osf.metrics import es8_metrics, RegistriesModerationMetrics -from osf.metrics.reporters.public_item_usage import ( - _iter_composite_bucket_keys, - _zip_sorted, -) +from osf.metrics.reporters.public_item_usage import _iter_composite_bucket_keys from osf.metrics.utils import ( YearMonth, get_database_iri, @@ -145,6 +144,17 @@ def migrate_preprint_downloads(from_when: str, until_when: str): _es8_bulk_save(es8_metrics.OsfCountedUsageEvent, _each_new) +@celery_app.task(**_TASK_KWARGS) +def schedule_migrate_usage_reports(until_when: str): + for _osfid in _merge_sorted_osfids( + _each_usage_report_osfid(until_when=until_when), + _each_countedusage_osfid(until_when=until_when), + _each_preprintview_osfid(until_when=until_when), + _each_preprintdownload_osfid(until_when=until_when), + ): + migrate_usage_reports.delay(_osfid, until_when) + + @celery_app.task(**_TASK_KWARGS) def migrate_usage_reports(osfid: str, until_when: str): # from PublicItemUsageReport to MonthlyPublicItemUsageReportEs8 @@ -152,28 +162,22 @@ def migrate_usage_reports(osfid: str, until_when: str): _item_is_component = is_osf_component(_osfobj) if _osfobj else False def _each_new(): - # go in sorted order to build cumulative counts - # (only a few dozen of these per item; should be fine to sort and load all at once) _each_hit = _es6_scan_range( es6_reports.PublicItemUsageReport, until_when=until_when, - addl_filter={'term': {'item_osfid': osfid}}, - sort='report_yearmonth', + addl_filter={'terms': {'item_osfid': _synonymous_osfids(osfid)}}, ) + # (only a few dozen of these per item; should be fine to load all at once) _hits = list(_each_hit) if _osfobj and not _hits: # this item has usages, but only before the monthly usage reparts started # -- create one for cumulative counts (if the object still exists) yield _backfill_old_usage_report(_osfobj, _item_is_component, until_when) else: - _prior_report = None for _hit in _hits: - yield ( - _prior_report := _convert_public_usage_report( - _hit['_source'], - _prior_report, - item_is_component=_item_is_component, - ) + yield _convert_public_usage_report( + _hit['_source'], + item_is_component=_item_is_component, ) _es8_bulk_save(es8_metrics.MonthlyPublicItemUsageReportEs8, _each_new()) @@ -213,7 +217,6 @@ def _es6_scan_range( from_when: str = '', until_when: str, addl_filter=None, - sort=None, ): _timestamp_range = {'lt': until_when} if from_when: @@ -224,8 +227,6 @@ def _es6_scan_range( if addl_filter: _filters.append(addl_filter) _query_body = {'query': {'bool': {'filter': _filters}}} - if sort: - _query_body['sort'] = sort return es6_helpers.scan( _es6_connection(), index=es6_recordtype._template_pattern, @@ -391,26 +392,13 @@ def _convert_preprint_metric( def _convert_public_usage_report( source: dict, - prior_report: es8_metrics.MonthlyPublicItemUsageReportEs8 | None, item_is_component: bool, ) -> es8_metrics.MonthlyPublicItemUsageReportEs8: - if prior_report is None: - _c_views, _c_view_sess, _c_downloads, _c_download_sess = _get_cumulative_usage( - osfid=source['item_osfid'], - until_when=YearMonth.from_str(source['report_yearmonth']).month_end(), - is_preprint=(source.get('item_type') == 'preprint'), - ) - else: - _c_views = prior_report.cumulative_view_count + source.get('view_count', 0) - _c_view_sess = prior_report.cumulative_view_session_count + ( - source.get('view_session_count', 0) or source.get('view_count', 0) - ) - _c_downloads = prior_report.cumulative_download_count + source.get( - 'download_count', 0 - ) - _c_download_sess = prior_report.cumulative_download_session_count + ( - source.get('download_session_count', 0) or source.get('download_count') - ) + _c_views, _c_view_sess, _c_downloads, _c_download_sess = _get_cumulative_usage( + osfid=source['item_osfid'], + until_when=YearMonth.from_str(source['report_yearmonth']).month_end(), + is_preprint=('preprint' in source.get('item_type', ())), + ) return es8_metrics.MonthlyPublicItemUsageReportEs8( cycle_coverage=_semverish_from_yearmonth(source['report_yearmonth']), item_iri=osfid_iri(source['item_osfid']), @@ -426,11 +414,11 @@ def _convert_public_usage_report( provider_ids=source.get('provider_id'), platform_iris=source.get('platform_iri') or [website_settings.DOMAIN], view_count=source.get('view_count', 0), - view_session_count=source.get('view_session_count', 0), + view_session_count=source.get('view_session_count') or source.get('view_count', 0), cumulative_view_count=_c_views, cumulative_view_session_count=_c_view_sess or _c_views, download_count=source.get('download_count', 0), - download_session_count=source.get('download_session_count', 0), + download_session_count=source.get('download_session_count') or source.get('download_count', 0), cumulative_download_count=_c_downloads, cumulative_download_session_count=_c_download_sess or _c_downloads, ) @@ -541,26 +529,32 @@ def _cumulative_countedusage_downloads(osfid, until_when) -> tuple[int, int]: def _cumulative_preprint_count(preprint_metric_cls, osfid: str, until_when: str) -> int: - '''aggregate views on each preprint''' + '''aggregate counts on given preprint''' # copied/adapted from osf.metrics.preprint_metrics - _preprint_ids = [osfid] - if osfid.endswith('_v1'): - # include pre-versioned-guid counts for v1 - _preprint_ids.append(osfid.removesuffix('_v1')) _search = ( preprint_metric_cls.search() - .filter('terms', preprint_id=_preprint_ids) + .filter('terms', preprint_id=_synonymous_osfids(osfid)) .filter('range', timestamp={'lt': until_when}) .extra(size=0) # no hits; only aggs ) _search.aggs.metric('agg_count', 'sum', field='count') _response = _search.execute() - _view_count = ( + return ( int(_response.aggregations.agg_count.value) if hasattr(_response.aggregations, 'agg_count') else 0 ) - return _view_count + + +def _synonymous_osfids(osfid: str) -> list[str]: + _synonyms = [osfid] + if osfid.endswith('_v1'): + # include pre-versioned-guid counts for v1 + _synonyms.append(osfid.removesuffix('_v1')) + elif '_' not in osfid: + # include v1 (if it exists) with unversioned guid + _synonyms.append(f'{osfid}_v1') + return _synonyms def _convert_item_type_list(osf_model_names: list[str] | str, has_surrounding_items: bool): @@ -675,6 +669,20 @@ def _each_preprintdownload_osfid(until_when, after_osfid=None) -> collections.ab return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) +def _merge_sorted_osfids(*osfid_iterables): + def _osfids_group_key(osfid: str): + return ( # v1 same as unversioned + osfid.removesuffix('_v1') + if osfid.endswith('_v1') + else osfid + ) + for _k, _g in itertools.groupby( + heapq.merge(*osfid_iterables), + key=_osfids_group_key, + ): + yield _k + + ### # the command itself @@ -732,12 +740,12 @@ def handle( self._clear_es8_data(unchanged, usage_events, usage_reports) self._check_started_at(start_now=start) _default_all = not any((unchanged, usage_events, usage_reports)) - if unchanged or _default_all: - self._handle_unchanged(start=start, no_counts=no_counts) - if usage_events or _default_all: - self._handle_usage_events(start=start, no_counts=no_counts) if usage_reports or _default_all: self._handle_usage_reports(start=start, no_counts=no_counts) + if usage_events or _default_all: + self._handle_usage_events(start=start, no_counts=no_counts) + if unchanged or _default_all: + self._handle_unchanged(start=start, no_counts=no_counts) if not no_counts: self.stdout.write('(counts may be approximate)') @@ -845,15 +853,7 @@ def _handle_usage_reports(self, *, start: bool, no_counts: bool): self.stdout.write( f'starting per-item {es6_reports.PublicItemUsageReport.__name__} => {es8_metrics.MonthlyPublicItemUsageReportEs8.__name__}' ) - for _osfid in _zip_sorted( - _each_usage_report_osfid(until_when=self._migration_started_at), - _each_countedusage_osfid(until_when=self._migration_started_at), - _each_preprintview_osfid(until_when=self._migration_started_at), - _each_preprintdownload_osfid(until_when=self._migration_started_at), - ): - migrate_usage_reports.delay( - _osfid, self._migration_started_at.isoformat() - ) + schedule_migrate_usage_reports.delay(self._migration_started_at.isoformat()) def _check_started_at(self, start_now): _started_at = self._migration_started_at @@ -885,9 +885,9 @@ def _clear_es8_data(self, unchanged, usage_events, usage_reports): if _default_all or unchanged: _to_clear.extend(_UNCHANGED_RECORDTYPES.values()) if _default_all or usage_events: - _to_clear.append(es8_metrics.MonthlyPublicItemUsageReportEs8) - if _default_all or usage_reports: _to_clear.append(es8_metrics.OsfCountedUsageEvent) + if _default_all or usage_reports: + _to_clear.append(es8_metrics.MonthlyPublicItemUsageReportEs8) for _es8_recordtype in _to_clear: self.stdout.write( f'clearing {_es8_recordtype.__name__}', self.style.NOTICE diff --git a/osf/metrics/reporters/public_item_usage.py b/osf/metrics/reporters/public_item_usage.py index ce33e24e87d..985a1213be2 100644 --- a/osf/metrics/reporters/public_item_usage.py +++ b/osf/metrics/reporters/public_item_usage.py @@ -14,6 +14,7 @@ from osf.metrics.counted_usage import ( CountedAuthUsage, get_provider_id, + get_item_type as get_legacy_item_type, ) from osf.metrics.preprint_metrics import ( PreprintDownload, @@ -80,7 +81,7 @@ def report(self, **report_kwargs): raise _SkipItem _report_es6 = PublicItemUsageReport( item_osfid=_report.item_osfids[0], - item_type=list(_report.item_types), + item_type=[get_legacy_item_type(_obj)], provider_id=list(_report.provider_ids), platform_iri=list(_report.platform_iris), view_count=_report.view_count,