all repos — nasg @ 9e0e58a4c6181db1418b58ce5973b46c72237d32

wayback.py (view raw)

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
__author__ = "Peter Molnar"
__copyright__ = "Copyright 2017-2019, Peter Molnar"
__license__ = "apache-2.0"
__maintainer__ = "Peter Molnar"
__email__ = "mail@petermolnar.net"

import re
import json
import os
import logging
import requests
from collections import deque
from urllib.parse import urlparse
import settings
import arrow
from time import sleep

logger = logging.getLogger("wayback")
logger.setLevel(10)

console_handler = logging.StreamHandler()
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

from pprint import pprint

RE_FIRST = re.compile(
    r"^\<(?P<url>[^>]+)\>; rel=\"first memento\"; datetime=\"(?P<datetime>[^\"]+).*$"
)


class FindWaybackURL(object):
    def __init__(self, path, category="", redirects=[]):
        self.path = path
        self.category = category
        self.redirects = redirects
        self.epoch = int(arrow.utcnow().timestamp)
        self.oldest = ""

    def possible_urls(self):
        q = {}
        paths = self.redirects
        paths.append(self.path)
        for path in paths:
            q[f"http://{settings.site.name}/{path}/"] = True
            q[f"http://{settings.site.name}/{path}/index.html"] = True

            domains = settings.formerdomains
            domains.append(settings.site.name)

            for domain in domains:
                q[f"http://{domain}/{path}/"] = True
                if self.category in settings.formercategories:
                    categories = settings.formercategories[
                        self.category
                    ]
                else:
                    categories = []
                categories.append(self.category)
                for category in categories:
                    q[f"http://{domain}/{category}/{path}/"] = True
                    q[
                        f"http://{domain}/category/{category}/{path}/"
                    ] = True
        # logger.info("possible urls: %s", json.dumps(list(q.keys()), indent=4, ensure_ascii=False))
        return list(q.keys())

    def get_first_memento(self, url):
        target = f"http://web.archive.org/web/timemap/link/{url}"
        logger.info("requesting %s", url)
        mementos = requests.get(target)
        if mementos.status_code == requests.codes.ok:
            if not len(mementos.text):
                logger.debug("empty memento response for %s", target)
            for memento in mementos.text.split("\n"):
                m = RE_FIRST.match(memento)
                if m:

                    r = settings.nameddict(
                        {
                            "epoch": int(
                                arrow.get(
                                    m.group("datetime"),
                                    "ddd, DD MMM YYYY HH:mm:ss ZZZ",
                                )
                                .to("utc")
                                .timestamp
                            ),
                            "url": m.group("url"),
                        }
                    )
                    logger.info("found memento candidate: %s", r)
                    return r
                else:
                    logger.debug(
                        "no first memento found at: %s", target
                    )
        else:
            logger.warning(
                "request failed: %s, status: %s, txt: %s",
                mementos,
                mementos.status_code,
                mementos.text,
            )

    def run(self):
        l = self.possible_urls()
        logger.info("running archive.org lookup for %s", self.path)
        for url in l:
            maybe = self.get_first_memento(url)
            if maybe:
                if maybe.epoch < self.epoch:
                    self.epoch = maybe.epoch
                    self.oldest = maybe.url
            sleep(.500)
        if not len(self.oldest):
            logger.error("no memento found for %s", self.path)
        else:
            logger.info(
                "\t\toldest found memento for %s: %s :: %s",
                self.path,
                str(arrow.get(self.epoch)),
                self.oldest,
            )