[go: up one dir, main page]

Page MenuHomePhabricator

Spike: Figure out if we have everything we need in Spark to emit page_change_late events
Closed, ResolvedPublic

Description

In T367810: Spike: Can we recreate a skeleton page_change (revision_change) event from DB replica alone?, we figured that we had most of the context necessary to emit reconciliation events that look like page_change.

We are considering dropping the idea of T368782: MediaWiki Reconciliation API and simply emitting such events directly from Spark via Event Gate.

In this task we want to confirm with code that this is possible.

Done is:

  • confirm whether we can emit a page_change looking event with the stuff we need only for the Dumps 2.0 use case.
    • if not possible, then how would a skeleton event look like?
  • such event would only miss the page content, the redirect_title, and the content_format. Those would be resolved via T368787: Flink job to enrich reconciliation events

Event Timeline

such event would only miss the page content and the redirect_title

And page_change_kind? and maybe changelog_kind?

Looking at T367810#9946776, revision.content_slots is missing too, but maybe you can get that after all?

And page_change_kind? and maybe changelog_kind?

This I would assume can be determined via heurstic from querying the replica. Same approach / semantics we have been discussing in T368782: MediaWiki Reconciliation API. The backed is the same, what changes if we go down the Spark route is how events are serialized and produced to kafka (we won't use EventBus).

nit: this said, I would prefer not to use page_change to describe the Dumps 2.0 streams.
We discussed using page_change schema for payload, but the stream semantic will be specific to the Dumps 2.0 use case. We don't need to be correct, just correct enough for downstream dumps consumers :).

nit: this said, I would prefer not to use page_change to describe the Dumps 2.0 streams.

Generally +1. But, do you think we can use approximately the same data model, perhaps even using some of the MW state entity schema fragments? Where it is easy to do so anyway?

nit: this said, I would prefer not to use page_change to describe the Dumps 2.0 streams.

Generally +1. But, do you think we can use approximately the same data model, perhaps even using some of the MW state entity schema fragments? Where it is easy to do so anyway?

Using the same data model would be our goal. This spike is investigating feasibility :)

My working assumption on Flink/ESC configs is that the stream will have page_change events.

xcollazo changed the task status from Open to In Progress.Sep 6 2024, 3:38 PM
xcollazo triaged this task as High priority.

Looking at T367810#9946776, revision.content_slots is missing too, but maybe you can get that after all?

Yes we can. It is ugly, but we can.

TL;DR: To closely follow page_change schema, we can get most required info for Dumps except as follows:

Needed by Dumps 2.0:

  • slots' content_format,
  • created_redirect_page

Both things can be fetched from the MW API as per discussions Slack.

Required by page_change, but *not strictly* needed by Dumps:

  • page_change_kind
  • changelog_kind

A heuristic could be done, of course, but that is unneccesary for Dumps, so I did not attempt it. From Dumps point of view, we could always set page_change_kind = edit and changelog_kind = update, or we can create a new schema that does not have these as required properties.

Code details:

Here is example SQL code that generates a result set that follows page_change schema:

jsons = spark.sql("""
SELECT '/mediawiki/page/late/1.0.0' AS `$schema`,
       c.comment_text AS comment,
       DATE_FORMAT(TO_TIMESTAMP(r.rev_timestamp, 'yyyyMMddkkmmss'), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") AS dt,
       NAMED_STRUCT(
           'namespace_id', p.page_namespace,
           'page_id', p.page_id,
           'page_title', p.page_title
       ) AS page,
       NAMED_STRUCT(
           'user_id', a.actor_id,  -- is it actor_id we want, or user_id?
           'user_text', a.actor_name
       ) AS performer,
       NAMED_STRUCT(
           'comment', c.comment_text,
           'content_slots', MAP(  -- FIXME: currently spitting one map per slot instead of one MAP with all slots
                                sr.role_name, NAMED_STRUCT(
                                                    'content_body', NULL,
                                                    'content_format', NULL,  -- not persisted in MariaDB!
                                                    'content_model', cm.model_name,
                                                    'content_sha1', ct.content_sha1,
                                                    'content_size', ct.content_size,
                                                    'origin_rev_id', r.rev_id,
                                                    'slot_role', sr.role_name
                                                )
                            ),
           'editor', NAMED_STRUCT(
                         'user_id', a.actor_id,  -- is it actor_id we want, or user_id?
                         'user_text', a.actor_name
                     ),
           'is_comment_visible', r.rev_deleted & 4 = 0,
           'is_content_visible', r.rev_deleted & 1 = 0,
           'is_editor_visible',  r.rev_deleted & 2 = 0,
           'is_minor_edit', CAST(r.rev_minor_edit AS BOOLEAN),
           'rev_dt', DATE_FORMAT(TO_TIMESTAMP(r.rev_timestamp, 'yyyyMMddkkmmss'), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
           'rev_id', r.rev_id,
           'rev_parent_id', r.rev_parent_id,
           'rev_sha1', r.rev_sha1,
           'rev_size', r.rev_len
       ) as revision,
       'etwiki' AS wiki_id
FROM wmf_raw.mediawiki_revision r
INNER JOIN wmf_raw.mediawiki_private_comment c ON r.rev_comment_id = c.comment_id
INNER JOIN wmf_raw.mediawiki_page p ON r.rev_page = p.page_id
INNER JOIN wmf_raw.mediawiki_private_actor a ON r.rev_actor = a.actor_id
INNER JOIN wmf_raw.mediawiki_slots s ON r.rev_id = s.slot_revision_id
INNER JOIN wmf_raw.mediawiki_slot_roles sr ON s.slot_role_id = sr.role_id
INNER JOIN wmf_raw.mediawiki_content ct ON s.slot_content_id = ct.content_id
INNER JOIN wmf_raw.mediawiki_content_models cm ON ct.content_model = cm.model_id

WHERE r.snapshot = '2024-08'
  AND r.wiki_db = 'etwiki'
  
  AND c.snapshot = '2024-08'
  AND c.wiki_db = 'etwiki'
  
  AND p.snapshot = '2024-08'
  AND p.wiki_db = 'etwiki'
  
  AND a.snapshot = '2024-08'
  AND a.wiki_db = 'etwiki'

  AND s.snapshot = '2024-08'
  AND s.wiki_db = 'etwiki'

  AND sr.snapshot = '2024-08'
  AND sr.wiki_db = 'etwiki'

  AND ct.snapshot = '2024-08'
  AND ct.wiki_db = 'etwiki'
  
  AND cm.snapshot = '2024-08'
  AND cm.wiki_db = 'etwiki'
LIMIT 10
""").toJSON().collect()

JSON looks like so:

{
    "$schema": "/mediawiki/page/late/1.0.0",
    "comment": "\u00dcmbersuunamine lehele [[Hiie p\u00f5der]]",
    "dt": "2018-08-02T07:41:19.000Z",
    "page": {
        "namespace_id": 0,
        "page_id": 517279,
        "page_title": "Hiie_hirv"
    },
    "performer": {
        "user_id": 2108,
        "user_text": "Estopedist1"
    },
    "revision": {
        "comment": "\u00dcmbersuunamine lehele [[Hiie p\u00f5der]]",
        "content_slots": {
            "main": {
                "content_model": "wikitext",
                "content_sha1": "e1kgga2lh5n2kqzmnlgl5ersavrskcb",
                "content_size": 22,
                "origin_rev_id": 5063002,
                "slot_role": "main"
            }
        },
        "editor": {
            "user_id": 2108,
            "user_text": "Estopedist1"
        },
        "is_comment_visible": true,
        "is_content_visible": true,
        "is_editor_visible": true,
        "is_minor_edit": false,
        "rev_dt": "2018-08-02T07:41:19.000Z",
        "rev_id": 5063002,
        "rev_parent_id": 0,
        "rev_sha1": "e1kgga2lh5n2kqzmnlgl5ersavrskcb",
        "rev_size": 22
    },
    "wiki_id": "etwiki"
}

Regarding content_format:

As per https://www.mediawiki.org/wiki/Manual:Revision_table#rev_content_format and https://www.mediawiki.org/wiki/Manual:Content_table#content_model, MW does not persist the revision format anymore. Instead they have a software mechanism to resolve it. This means I will not be able to get this info from the DB when generating reconcile events.

It looks like, however, that when asking for the content of a slot on the MW API, we do get the contentformat back, so we should be good with having the Flink job do this as well.

Example:

https://www.mediawiki.org/w/api.php?action=query&prop=revisions&titles=MediaWiki&rvlimit=5&rvprop=timestamp%7Cuser%7Ccomment%7Ccontentmodel%7Ccontent&rvslots=*

Output:

{
    "continue": {
        "rvcontinue": "20191003091610|3439858",
        "continue": "||"
    },
    "query": {
        "pages": {
            "1": {
                "pageid": 1,
                "ns": 0,
                "title": "MediaWiki",
                "revisions": [
                    {
                        "user": "Tropicalkitty",
                        "timestamp": "2023-12-29T18:14:25Z",
                        "slots": {
                            "main": {
                                "contentmodel": "wikitext",
                                "contentformat": "text/x-wiki",
                                "*": "{{Main page|{{zh other|zh|{{#ifexist:Template:Main page/{{int:lang}}|{{int:lang}}|en}}}}}}{{NOEXTERNALLANGLINKS}}<!--\n-- To edit what appears on this page,\n-- please see [[Template:Main_page]] \n-- https://www.mediawiki.org/wiki/Template:Main_page\n-->"
                            }
                        },
                        "comment": "Requested at [[Topic:Xw9xzifnfeunzvnn]]"
                    },
...

A note on source of truth:

Even though I used the data lake monthly sqooped tables in T374055#10131161 for convenience, all data is coming from Mediawiki tables, thus this mechanism would work as well on a daily basis by running it against the Analytics Replicas.