all repos — nasg @ 558195288d9d6c5af4bf116c64e6537b01a268c8

webmention.py (view raw)

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
import asyncio
import uvloop
import os
import hashlib
import json
import urllib.parse
import frontmatter
from sanic import Sanic
import sanic.response
from sanic.log import log as logging
import validators
import arrow
from webmentiontools import urlinfo
import shared
import envelope


class WebmentionHandler(object):
    def __init__ (self, source, target):
        self.source = source
        self.target = target
        self.now = arrow.utcnow().timestamp
        logging.info("incoming webmention %s => %s", self.source, self.target)

        self.r = sanic.response.text(
            "something went wrong on my side, could you please let me know at hello@petermolnar.eu ?",
            status=500
        )

    def run(self):
        if not self._validate():
            return

        self._parse()
        self._save()
        self._notify()

    def _validate(self):
        test = {
            self.source: '"souce" parameter is an invalid URL',
            self.target: '"target" parameter is an invalid URL'
        }
        for url, emsg in test.items():
            logging.debug("validating URL %s", url)
            if not validators.url(url):
                self.r = sanic.response.text(
                    emsg,
                    status=400
                )
                return False

        logging.debug("checking target domain")
        _target = urllib.parse.urlparse(self.target)
        _target_domain = '{uri.netloc}'.format(uri=_target)
        _mydomains = shared.config.get('site', 'domains').split(" ")
        if not _target_domain in _mydomains:
            self.r = sanic.response.text(
                "'target' is not in the list of allowed domains",
                status=400
            )
            return False

        logging.debug("checking selfpings")
        _source = urllib.parse.urlparse(self.source)
        _source_domain = '{uri.netloc}'.format(uri=_source)
        if _source_domain in _mydomains:
            self.r = sanic.response.text(
                "selfpings are not allowed",
                status=400
            )
            return False

        return True

    def _parse(self):
        logging.debug("fetching %s", self.source)
        self._source = urlinfo.UrlInfo(self.source)
        if self._source.error:
            self.r = sanic.response.text(
                "couldn't fetch 'source' from %s" % (self.source),
                status=408
            )
            return False

        self.source = self._source.realurl
        if not self._source.linksTo(self.target):
            self.r = sanic.response.text(
                "'source' (%s) does not link to 'target' (%s)" % (
                    self.source,
                    self.target
                ),
                status=400
            )
            return False

        logging.debug("fetching %s", self.target)
        self._target = urlinfo.UrlInfo(self.target)
        if self._target.error:
            self.r = sanic.response.text(
                "couldn't fetch 'target' from %s" % (self.target),
                status=408
            )
        self.target = self._target.realurl
        #logging.info("parsed webmention:\n%s\n\n%s", self.meta, self.content)

    def _save(self):
        doc = frontmatter.loads('')
        doc.metadata = self.meta
        doc.content = self.content
        target = os.path.join(
            shared.config.get('source', 'commentsdir'),
            self.mhash
        )
        if os.path.isfile(target):
            logging.warning('updating existing webmention %s', target)
        else:
            logging.warning('saving incoming webmention to %s', target)

        with open(target, 'wt') as t:
            t.write(frontmatter.dumps(doc))
            self.r = sanic.response.text(
                "accepted",
                status=202
            )

    def _notify(self):
        text = "# webmention\n## Source\n\nauthor\n:    %s\n\nURL\n:    %s\n\nemail\n:    %s\n\ndate\n:    %s\n\n## Target\n\nURL\n:    %s\n\n---\n\n%s" % (
            self._meta['author'].get('name', self.source),
            self._meta['author'].get('url', self.source),
            self._meta['author'].get('email', ''),
            self._meta['date'],
            self.target,
            self.content
        )

        l = envelope.Letter(
            sender=(
                shared.config.get('webmention', 'from_name'),
                shared.config.get('webmention', 'from_address')
            ),
            recipient=(
                shared.config.get('webmention', 'to_name'),
                shared.config.get('webmention', 'to_address')
            ),
            subject="[webmention] %s" % self.source,
            text=text
        )
        l.make()
        l.send()

    @property
    def mhash(self):
        return hashlib.sha1(json.dumps(self.meta, sort_keys=True).encode('utf-8')).hexdigest()

    @property
    def meta(self):
        if hasattr(self, '_meta'):
            return self._meta

        self._meta = {
            'author': self._source.author(),
            'type': self._source.relationType(),
            'target': self.target,
            'source': self.source,
            'date': arrow.get(self._source.pubDate()).format(shared.ARROWISO),
        }
        return self._meta

    @property
    def content(self):
        if hasattr(self, '_content'):
            return self._content

        # from HTML to Markdown
        self._content = shared.Pandoc(False).convert(self._source.content())
        # from Markdown back to HTML
        #self._content = shared.Pandoc().convert(tmpcontent)
        return self._content


if __name__ == '__main__':
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    app = Sanic()

    @app.route("/webmention", methods=["POST"])
    async def wm(request):
        source = request.form.get('source')
        target = request.form.get('target')
        r = WebmentionHandler(source, target)
        r.run()
        return r.r

    app.run(host="127.0.0.1", port=8002, debug=True)