Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion admin/management/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def post(self, request):
class MigrateOsfmetrics6to8(ManagementCommandPermissionView):
def post(self, request):
_command_kwargs = {
'no_setup': True,
'no_color': True,
'no_counts': request.POST.get('no_counts'),
'clear_state': request.POST.get('clear_state'),
Expand Down
168 changes: 130 additions & 38 deletions osf/management/commands/migrate_osfmetrics_6to8.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging

from django.apps import apps
from django.core.management import call_command
from django.core.management.base import BaseCommand
from django.db import OperationalError as DjangoOperationalError
from elasticsearch6.exceptions import ConnectionError as Elastic6ConnectionError
Expand All @@ -18,16 +17,28 @@

from framework.celery_tasks import app as celery_app
from osf.metadata.rdfutils import OSF
from osf.metadata.osfmap_utils import osfmap_type_from_model, is_osf_component
from osf.metadata.osfmap_utils import is_osf_component
from osf.metrics.preprint_metrics import (
PreprintView,
PreprintDownload,
)
from osf.metrics.counted_usage import CountedAuthUsage as CountedUsageEs6
from osf.metrics.counted_usage import (
CountedAuthUsage as CountedUsageEs6,
get_provider_id,
)
from osf.metrics import reports as es6_reports
from osf.metrics import es8_metrics, RegistriesModerationMetrics
from osf.metrics.reporters.public_item_usage import _iter_composite_bucket_keys
from osf.metrics.utils import YearMonth
from osf.metrics.reporters.public_item_usage import (
_iter_composite_bucket_keys,
_zip_sorted,
)
from osf.metrics.utils import (
YearMonth,
get_database_iri,
get_item_type,
get_item_type_from_model,
get_item_type_from_iri,
)
from osf import models as osfdb
from osf.models.base import osfid_iri
from website import settings as website_settings
Expand All @@ -42,6 +53,8 @@

_MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control

_COMPOSITE_CHUNK_SIZE = 500

_UNCHANGED_RECORDTYPES = {
# reports
es6_reports.StorageAddonUsage: es8_metrics.DailyStorageAddonUsageReportEs8,
Expand Down Expand Up @@ -135,8 +148,8 @@ def migrate_preprint_downloads(from_when: str, until_when: str):
@celery_app.task(**_TASK_KWARGS)
def migrate_usage_reports(osfid: str, until_when: str):
# from PublicItemUsageReport to MonthlyPublicItemUsageReportEs8
_osfguid = osfdb.Guid.load(osfid)
_item_is_component = is_osf_component(_osfguid.referent) if _osfguid else False
_osfobj, _ = osfdb.Guid.load_referent(osfid)
_item_is_component = is_osf_component(_osfobj) if _osfobj else False

def _each_new():
# go in sorted order to build cumulative counts
Expand All @@ -147,15 +160,21 @@ def _each_new():
addl_filter={'term': {'item_osfid': osfid}},
sort='report_yearmonth',
)
_prior_report = None
for _hit in list(_each_hit):
yield (
_prior_report := _convert_public_usage_report(
_hit['_source'],
_prior_report,
item_is_component=_item_is_component,
_hits = list(_each_hit)
if _osfobj and not _hits:
# this item has usages, but only before the monthly usage reparts started
# -- create one for cumulative counts (if the object still exists)
yield _backfill_old_usage_report(_osfobj, _item_is_component, until_when)
else:
_prior_report = None
for _hit in _hits:
yield (
_prior_report := _convert_public_usage_report(
_hit['_source'],
_prior_report,
item_is_component=_item_is_component,
)
)
)

_es8_bulk_save(es8_metrics.MonthlyPublicItemUsageReportEs8, _each_new())

Expand Down Expand Up @@ -289,8 +308,8 @@ def _assert_field_unchangedness(es6_recordtype, es8_recordtype):
assert _es6_fields == _es8_fields


def _semverish_from_yearmonth(given_yearmonth: str):
_ym = YearMonth.from_str(given_yearmonth)
def _semverish_from_yearmonth(given_yearmonth):
_ym = YearMonth.from_any(given_yearmonth)
return f'{_ym.year}.{_ym.month}'


Expand Down Expand Up @@ -362,7 +381,7 @@ def _convert_preprint_metric(
),
# fields from OsfCountedUsageEvent:
item_osfid=_source['preprint_id'],
item_type=OSF.Preprint,
item_type='Preprint',
item_public=True,
provider_id=_source.get('provider_id'),
user_is_authenticated=bool(_source.get('user_id')),
Expand All @@ -383,14 +402,14 @@ def _convert_public_usage_report(
)
else:
_c_views = prior_report.cumulative_view_count + source.get('view_count', 0)
_c_view_sess = prior_report.cumulative_view_session_count + source.get(
'view_session_count', 0
_c_view_sess = prior_report.cumulative_view_session_count + (
source.get('view_session_count', 0) or source.get('view_count', 0)
)
_c_downloads = prior_report.cumulative_download_count + source.get(
'download_count', 0
)
_c_download_sess = prior_report.cumulative_download_session_count + source.get(
'download_session_count', 0
_c_download_sess = prior_report.cumulative_download_session_count + (
source.get('download_session_count', 0) or source.get('download_count')
)
return es8_metrics.MonthlyPublicItemUsageReportEs8(
cycle_coverage=_semverish_from_yearmonth(source['report_yearmonth']),
Expand All @@ -409,11 +428,38 @@ def _convert_public_usage_report(
view_count=source.get('view_count', 0),
view_session_count=source.get('view_session_count', 0),
cumulative_view_count=_c_views,
cumulative_view_session_count=_c_view_sess,
cumulative_view_session_count=_c_view_sess or _c_views,
download_count=source.get('download_count', 0),
download_session_count=source.get('download_session_count', 0),
cumulative_download_count=_c_downloads,
cumulative_download_session_count=_c_download_sess,
cumulative_download_session_count=_c_download_sess or _c_downloads,
)


def _backfill_old_usage_report(osf_obj, is_component: bool, until_when: str):
# add a "last month" report with cumulative counts up to that point
_last_month = YearMonth.from_date(datetime.datetime.fromisoformat(until_when)).prior()
_c_views, _c_view_sess, _c_downloads, _c_download_sess = _get_cumulative_usage(
osfid=osf_obj._id,
until_when=_last_month.month_end().isoformat(),
is_preprint=isinstance(osf_obj, osfdb.Preprint),
)
return es8_metrics.MonthlyPublicItemUsageReportEs8(
cycle_coverage=_semverish_from_yearmonth(_last_month),
item_iri=osfid_iri(osf_obj._id),
item_osfids=[osf_obj._id],
item_types=[get_item_type(osf_obj)],
provider_ids=[get_provider_id(osf_obj)],
database_iris=[get_database_iri(osf_obj)],
platform_iris=[website_settings.DOMAIN],
view_count=0,
view_session_count=0,
cumulative_view_count=_c_views,
cumulative_view_session_count=_c_view_sess or _c_views,
download_count=0,
download_session_count=0,
cumulative_download_count=_c_downloads,
cumulative_download_session_count=_c_download_sess or _c_downloads,
)


Expand Down Expand Up @@ -497,9 +543,13 @@ def _cumulative_countedusage_downloads(osfid, until_when) -> tuple[int, int]:
def _cumulative_preprint_count(preprint_metric_cls, osfid: str, until_when: str) -> int:
'''aggregate views on each preprint'''
# copied/adapted from osf.metrics.preprint_metrics
_preprint_ids = [osfid]
if osfid.endswith('_v1'):
# include pre-versioned-guid counts for v1
_preprint_ids.append(osfid.removesuffix('_v1'))
_search = (
preprint_metric_cls.search()
.filter('term', preprint_id=osfid)
.filter('terms', preprint_id=_preprint_ids)
.filter('range', timestamp={'lt': until_when})
.extra(size=0) # no hits; only aggs
)
Expand All @@ -525,13 +575,13 @@ def _convert_item_type_list(osf_model_names: list[str] | str, has_surrounding_it
def _convert_item_type(osf_model_name: str | None, has_surrounding_items: bool):
if osf_model_name:
try:
return osfmap_type_from_model(
return get_item_type_from_model(
apps.get_model('osf', osf_model_name),
is_component=has_surrounding_items,
)
except LookupError:
pass
return OSF.Object # fine, fallback to abstract type
return get_item_type_from_iri(OSF.Object) # fallback abstract osf:Object


def _convert_database_iri_list(provider_ids: list[str], osf_model_names: list[str]):
Expand Down Expand Up @@ -578,16 +628,58 @@ def _each_usage_report_osfid(until_when, after_osfid=None):
return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid)


def _each_countedusage_osfid(until_when, after_osfid=None) -> collections.abc.Iterator[str]:
_search = (
CountedUsageEs6.search()
.filter('term', item_public=True)
.filter('terms', action_labels=['view', 'download'])
.filter('range', timestamp={'lt': until_when})
.extra(size=0) # only aggregations, no hits
)
_search.aggs.bucket(
'agg_osfid',
'composite',
sources=[{'osfid': {'terms': {'field': 'item_guid'}}}],
size=_COMPOSITE_CHUNK_SIZE,
)
return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid)


def _each_preprintview_osfid(until_when, after_osfid=None) -> collections.abc.Iterator[str]:
_search = (
PreprintView.search()
.filter('range', timestamp={'lt': until_when})
.extra(size=0) # only aggregations, no hits
)
_search.aggs.bucket(
'agg_osfid',
'composite',
sources=[{'osfid': {'terms': {'field': 'preprint_id'}}}],
size=_COMPOSITE_CHUNK_SIZE,
)
return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid)


def _each_preprintdownload_osfid(until_when, after_osfid=None) -> collections.abc.Iterator[str]:
_search = (
PreprintDownload.search()
.filter('range', timestamp={'lt': until_when})
.extra(size=0) # only aggregations, no hits
)
_search.aggs.bucket(
'agg_osfid',
'composite',
sources=[{'osfid': {'terms': {'field': 'preprint_id'}}}],
size=_COMPOSITE_CHUNK_SIZE,
)
return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid)


###
# the command itself


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
'--no-setup',
action='store_true',
)
parser.add_argument(
'--no-counts',
action='store_true',
Expand Down Expand Up @@ -624,7 +716,6 @@ def _migration_started_at(self):
def handle(
self,
*,
no_setup,
no_counts,
clear_state,
clear_es8_data,
Expand All @@ -635,8 +726,6 @@ def handle(
**kwargs,
):
self._quiet_chatty_loggers()
if not no_setup:
call_command('djelme_backend_setup')
if clear_state:
self._clear_state()
if clear_es8_data:
Expand Down Expand Up @@ -750,14 +839,17 @@ def _handle_usage_reports(self, *, start: bool, no_counts: bool):
_es8_item_count,
style=self._eq_style(_es8_item_count, _es6_item_count),
)
# (if --start) schedule task per item (by composite agg on es6 public usage reports)
# (if --start) schedule task per item (by composite agg on es6 usage reports and events)
# each item-task iter thru reports oldest to newest, adding cumulative counts
if start:
self.stdout.write(
f'starting per-item {es6_reports.PublicItemUsageReport.__name__} => {es8_metrics.MonthlyPublicItemUsageReportEs8.__name__}'
)
for _osfid in _each_usage_report_osfid(
until_when=self._migration_started_at
for _osfid in _zip_sorted(
_each_usage_report_osfid(until_when=self._migration_started_at),
_each_countedusage_osfid(until_when=self._migration_started_at),
_each_preprintview_osfid(until_when=self._migration_started_at),
_each_preprintdownload_osfid(until_when=self._migration_started_at),
):
migrate_usage_reports.delay(
_osfid, self._migration_started_at.isoformat()
Expand Down
22 changes: 8 additions & 14 deletions osf/metrics/es8_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
from elasticsearch_metrics import DAILY, MONTHLY, YEARLY
import elasticsearch_metrics.imps.elastic8 as djelme

from osf.metadata.osfmap_utils import (
osfmap_type,
osfid_from_iri,
)
from osf.metadata.osfmap_utils import osfid_from_iri
from osf.metrics.counted_usage import _get_surrounding_guids
from osf.metrics.utils import YearMonth
from osf.metrics.utils import (
YearMonth,
get_database_iri,
get_item_type,
)
from osf import models as osfdb
from osf.models.base import osfid_iri
from website import settings as website_settings
Expand Down Expand Up @@ -208,7 +209,7 @@ def _autofill_item_public(self):

def _autofill_item_type(self):
if self.item_osfid and not self.item_type:
self.item_type = osfmap_type(self._osfid_referent)
self.item_type = get_item_type(self._osfid_referent)

def _autofill_provider_id(self):
if self.item_osfid and not self.provider_id:
Expand Down Expand Up @@ -245,14 +246,7 @@ def _autofill_pageview(self):

def _autofill_database_iri(self):
if self.item_osfid and not self.database_iri:
_provider = getattr(self._osfid_referent, 'provider', None)
if not _provider:
self.database_iri = website_settings.DOMAIN
elif isinstance(_provider, str):
# file providers are a different thing that don't really have an iri, just an id
self.database_iri = f'urn:files.osf.io:{self.provider_id}'
else:
self.database_iri = _provider.get_semantic_iri()
self.database_iri = get_database_iri(self._osfid_referent)

def _clean_action_labels(self):
if self.action_labels:
Expand Down
9 changes: 7 additions & 2 deletions osf/metrics/reporters/public_item_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@
from osf.metadata.osf_gathering import OsfmapPartition
from osf.metrics.counted_usage import (
CountedAuthUsage,
get_item_type,
get_provider_id,
)
from osf.metrics.preprint_metrics import (
PreprintDownload,
PreprintView,
)
from osf.metrics.reports import PublicItemUsageReport
from osf.metrics.utils import YearMonth, cycle_coverage_yearmonth
from osf.metrics.utils import (
YearMonth,
cycle_coverage_yearmonth,
get_database_iri,
get_item_type,
)
from osf import models as osfdb
from osf.models.base import osfid_iri
from website import settings as website_settings
Expand Down Expand Up @@ -160,6 +164,7 @@ def _init_report(self, osf_obj) -> MonthlyPublicItemUsageReportEs8:
item_osfids=[osf_obj._id],
item_types=[get_item_type(osf_obj)],
provider_ids=[get_provider_id(osf_obj)],
database_iris=[get_database_iri(osf_obj)],
platform_iris=[website_settings.DOMAIN],
# leave counts null; will be set if there's data
)
Expand Down
Loading