Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 61 additions & 61 deletions osf/management/commands/migrate_osfmetrics_6to8.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import collections
import datetime
import functools
import heapq
import itertools
import logging

from django.apps import apps
Expand Down Expand Up @@ -28,10 +30,7 @@
)
from osf.metrics import reports as es6_reports
from osf.metrics import es8_metrics, RegistriesModerationMetrics
from osf.metrics.reporters.public_item_usage import (
_iter_composite_bucket_keys,
_zip_sorted,
)
from osf.metrics.reporters.public_item_usage import _iter_composite_bucket_keys
from osf.metrics.utils import (
YearMonth,
get_database_iri,
Expand Down Expand Up @@ -145,35 +144,40 @@ def migrate_preprint_downloads(from_when: str, until_when: str):
_es8_bulk_save(es8_metrics.OsfCountedUsageEvent, _each_new)


@celery_app.task(**_TASK_KWARGS)
def schedule_migrate_usage_reports(until_when: str):
for _osfid in _merge_sorted_osfids(
_each_usage_report_osfid(until_when=until_when),
_each_countedusage_osfid(until_when=until_when),
_each_preprintview_osfid(until_when=until_when),
_each_preprintdownload_osfid(until_when=until_when),
):
migrate_usage_reports.delay(_osfid, until_when)


@celery_app.task(**_TASK_KWARGS)
def migrate_usage_reports(osfid: str, until_when: str):
# from PublicItemUsageReport to MonthlyPublicItemUsageReportEs8
_osfobj, _ = osfdb.Guid.load_referent(osfid)
_item_is_component = is_osf_component(_osfobj) if _osfobj else False

def _each_new():
# go in sorted order to build cumulative counts
# (only a few dozen of these per item; should be fine to sort and load all at once)
_each_hit = _es6_scan_range(
es6_reports.PublicItemUsageReport,
until_when=until_when,
addl_filter={'term': {'item_osfid': osfid}},
sort='report_yearmonth',
addl_filter={'terms': {'item_osfid': _synonymous_osfids(osfid)}},
)
# (only a few dozen of these per item; should be fine to load all at once)
_hits = list(_each_hit)
if _osfobj and not _hits:
# this item has usages, but only before the monthly usage reparts started
# -- create one for cumulative counts (if the object still exists)
yield _backfill_old_usage_report(_osfobj, _item_is_component, until_when)
else:
_prior_report = None
for _hit in _hits:
yield (
_prior_report := _convert_public_usage_report(
_hit['_source'],
_prior_report,
item_is_component=_item_is_component,
)
yield _convert_public_usage_report(
_hit['_source'],
item_is_component=_item_is_component,
)

_es8_bulk_save(es8_metrics.MonthlyPublicItemUsageReportEs8, _each_new())
Expand Down Expand Up @@ -213,7 +217,6 @@ def _es6_scan_range(
from_when: str = '',
until_when: str,
addl_filter=None,
sort=None,
):
_timestamp_range = {'lt': until_when}
if from_when:
Expand All @@ -224,8 +227,6 @@ def _es6_scan_range(
if addl_filter:
_filters.append(addl_filter)
_query_body = {'query': {'bool': {'filter': _filters}}}
if sort:
_query_body['sort'] = sort
return es6_helpers.scan(
_es6_connection(),
index=es6_recordtype._template_pattern,
Expand Down Expand Up @@ -391,26 +392,13 @@ def _convert_preprint_metric(

def _convert_public_usage_report(
source: dict,
prior_report: es8_metrics.MonthlyPublicItemUsageReportEs8 | None,
item_is_component: bool,
) -> es8_metrics.MonthlyPublicItemUsageReportEs8:
if prior_report is None:
_c_views, _c_view_sess, _c_downloads, _c_download_sess = _get_cumulative_usage(
osfid=source['item_osfid'],
until_when=YearMonth.from_str(source['report_yearmonth']).month_end(),
is_preprint=(source.get('item_type') == 'preprint'),
)
else:
_c_views = prior_report.cumulative_view_count + source.get('view_count', 0)
_c_view_sess = prior_report.cumulative_view_session_count + (
source.get('view_session_count', 0) or source.get('view_count', 0)
)
_c_downloads = prior_report.cumulative_download_count + source.get(
'download_count', 0
)
_c_download_sess = prior_report.cumulative_download_session_count + (
source.get('download_session_count', 0) or source.get('download_count')
)
_c_views, _c_view_sess, _c_downloads, _c_download_sess = _get_cumulative_usage(
osfid=source['item_osfid'],
until_when=YearMonth.from_str(source['report_yearmonth']).month_end(),
is_preprint=('preprint' in source.get('item_type', ())),
)
return es8_metrics.MonthlyPublicItemUsageReportEs8(
cycle_coverage=_semverish_from_yearmonth(source['report_yearmonth']),
item_iri=osfid_iri(source['item_osfid']),
Expand All @@ -426,11 +414,11 @@ def _convert_public_usage_report(
provider_ids=source.get('provider_id'),
platform_iris=source.get('platform_iri') or [website_settings.DOMAIN],
view_count=source.get('view_count', 0),
view_session_count=source.get('view_session_count', 0),
view_session_count=source.get('view_session_count') or source.get('view_count', 0),
cumulative_view_count=_c_views,
cumulative_view_session_count=_c_view_sess or _c_views,
download_count=source.get('download_count', 0),
download_session_count=source.get('download_session_count', 0),
download_session_count=source.get('download_session_count') or source.get('download_count', 0),
cumulative_download_count=_c_downloads,
cumulative_download_session_count=_c_download_sess or _c_downloads,
)
Expand Down Expand Up @@ -541,26 +529,32 @@ def _cumulative_countedusage_downloads(osfid, until_when) -> tuple[int, int]:


def _cumulative_preprint_count(preprint_metric_cls, osfid: str, until_when: str) -> int:
'''aggregate views on each preprint'''
'''aggregate counts on given preprint'''
# copied/adapted from osf.metrics.preprint_metrics
_preprint_ids = [osfid]
if osfid.endswith('_v1'):
# include pre-versioned-guid counts for v1
_preprint_ids.append(osfid.removesuffix('_v1'))
_search = (
preprint_metric_cls.search()
.filter('terms', preprint_id=_preprint_ids)
.filter('terms', preprint_id=_synonymous_osfids(osfid))
.filter('range', timestamp={'lt': until_when})
.extra(size=0) # no hits; only aggs
)
_search.aggs.metric('agg_count', 'sum', field='count')
_response = _search.execute()
_view_count = (
return (
int(_response.aggregations.agg_count.value)
if hasattr(_response.aggregations, 'agg_count')
else 0
)
return _view_count


def _synonymous_osfids(osfid: str) -> list[str]:
_synonyms = [osfid]
if osfid.endswith('_v1'):
# include pre-versioned-guid counts for v1
_synonyms.append(osfid.removesuffix('_v1'))
elif '_' not in osfid:
# include v1 (if it exists) with unversioned guid
_synonyms.append(f'{osfid}_v1')
return _synonyms


def _convert_item_type_list(osf_model_names: list[str] | str, has_surrounding_items: bool):
Expand Down Expand Up @@ -675,6 +669,20 @@ def _each_preprintdownload_osfid(until_when, after_osfid=None) -> collections.ab
return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid)


def _merge_sorted_osfids(*osfid_iterables):
def _osfids_group_key(osfid: str):
return ( # v1 same as unversioned
osfid.removesuffix('_v1')
if osfid.endswith('_v1')
else osfid
)
for _k, _g in itertools.groupby(
heapq.merge(*osfid_iterables),
key=_osfids_group_key,
):
yield _k


###
# the command itself

Expand Down Expand Up @@ -732,12 +740,12 @@ def handle(
self._clear_es8_data(unchanged, usage_events, usage_reports)
self._check_started_at(start_now=start)
_default_all = not any((unchanged, usage_events, usage_reports))
if unchanged or _default_all:
self._handle_unchanged(start=start, no_counts=no_counts)
if usage_events or _default_all:
self._handle_usage_events(start=start, no_counts=no_counts)
if usage_reports or _default_all:
self._handle_usage_reports(start=start, no_counts=no_counts)
if usage_events or _default_all:
self._handle_usage_events(start=start, no_counts=no_counts)
if unchanged or _default_all:
self._handle_unchanged(start=start, no_counts=no_counts)
if not no_counts:
self.stdout.write('(counts may be approximate)')

Expand Down Expand Up @@ -845,15 +853,7 @@ def _handle_usage_reports(self, *, start: bool, no_counts: bool):
self.stdout.write(
f'starting per-item {es6_reports.PublicItemUsageReport.__name__} => {es8_metrics.MonthlyPublicItemUsageReportEs8.__name__}'
)
for _osfid in _zip_sorted(
_each_usage_report_osfid(until_when=self._migration_started_at),
_each_countedusage_osfid(until_when=self._migration_started_at),
_each_preprintview_osfid(until_when=self._migration_started_at),
_each_preprintdownload_osfid(until_when=self._migration_started_at),
):
migrate_usage_reports.delay(
_osfid, self._migration_started_at.isoformat()
)
schedule_migrate_usage_reports.delay(self._migration_started_at.isoformat())

def _check_started_at(self, start_now):
_started_at = self._migration_started_at
Expand Down Expand Up @@ -885,9 +885,9 @@ def _clear_es8_data(self, unchanged, usage_events, usage_reports):
if _default_all or unchanged:
_to_clear.extend(_UNCHANGED_RECORDTYPES.values())
if _default_all or usage_events:
_to_clear.append(es8_metrics.MonthlyPublicItemUsageReportEs8)
if _default_all or usage_reports:
_to_clear.append(es8_metrics.OsfCountedUsageEvent)
if _default_all or usage_reports:
_to_clear.append(es8_metrics.MonthlyPublicItemUsageReportEs8)
for _es8_recordtype in _to_clear:
self.stdout.write(
f'clearing {_es8_recordtype.__name__}', self.style.NOTICE
Expand Down
3 changes: 2 additions & 1 deletion osf/metrics/reporters/public_item_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from osf.metrics.counted_usage import (
CountedAuthUsage,
get_provider_id,
get_item_type as get_legacy_item_type,
)
from osf.metrics.preprint_metrics import (
PreprintDownload,
Expand Down Expand Up @@ -80,7 +81,7 @@ def report(self, **report_kwargs):
raise _SkipItem
_report_es6 = PublicItemUsageReport(
item_osfid=_report.item_osfids[0],
item_type=list(_report.item_types),
item_type=[get_legacy_item_type(_obj)],
provider_id=list(_report.provider_ids),
platform_iri=list(_report.platform_iris),
view_count=_report.view_count,
Expand Down