67f2978aeb
It's also for more backwards compatibility, though <figure> and <figcaption> is going to stay, because those make a lot of sense. Coming up next: out with JSON-LD. It was an interesting experiment which didn't bring any good at all, even in the long run.
2759 lines
80 KiB
Python
2759 lines
80 KiB
Python
#!/usr/bin/env python3
|
|
|
|
__author__ = "Peter Molnar"
|
|
__copyright__ = "Copyright 2017-2019, Peter Molnar"
|
|
__license__ = "apache-2.0"
|
|
__maintainer__ = "Peter Molnar"
|
|
__email__ = "mail@petermolnar.net"
|
|
|
|
import glob
|
|
import os
|
|
import time
|
|
import re
|
|
import asyncio
|
|
import sqlite3
|
|
import json
|
|
|
|
from shutil import copy2 as cp
|
|
from shutil import rmtree
|
|
from shutil import copyfileobj
|
|
from urllib.parse import urlparse
|
|
from collections import namedtuple
|
|
import logging
|
|
|
|
import arrow
|
|
import langdetect
|
|
import wand.image
|
|
import filetype
|
|
import jinja2
|
|
import yaml
|
|
|
|
import frontmatter
|
|
from feedgen.feed import FeedGenerator
|
|
from feedgen.entry import FeedEntry
|
|
|
|
from slugify import slugify
|
|
import requests
|
|
|
|
from pandoc import PandocMD2HTML, PandocMD2TXT, PandocHTML2TXT
|
|
from meta import Exif
|
|
import settings
|
|
import keys
|
|
import wayback
|
|
|
|
logger = logging.getLogger("NASG")
|
|
|
|
MarkdownImage = namedtuple(
|
|
"MarkdownImage", ["match", "alt", "fname", "title", "css"]
|
|
)
|
|
|
|
RE_MDIMG = re.compile(
|
|
r"(?P<match>!\[(?P<alt>[^\]]+)?\]\((?P<fname>[^\s\]]+)"
|
|
r"(?:\s[\'\"](?P<title>[^\"\']+)[\'\"])?\)(?:{(?P<css>[^\}]+)\})?)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
RE_CODE = re.compile(r"^(?:[~`]{3,4}).+$", re.MULTILINE)
|
|
|
|
RE_PRECODE = re.compile(r'<pre class="([^"]+)"><code>')
|
|
|
|
RE_MYURL = re.compile(
|
|
r'(^(%s[^"]+)$|"(%s[^"]+)")'
|
|
% (settings.site.url, settings.site.url)
|
|
)
|
|
|
|
|
|
def mtime(path):
|
|
""" return seconds level mtime or 0 (chomp microsecs) """
|
|
if os.path.exists(path):
|
|
return int(os.path.getmtime(path))
|
|
return 0
|
|
|
|
|
|
def utfyamldump(data):
|
|
""" dump YAML with actual UTF-8 chars """
|
|
return yaml.dump(
|
|
data, default_flow_style=False, indent=4, allow_unicode=True
|
|
)
|
|
|
|
|
|
def url2slug(url, limit=200):
|
|
""" convert URL to max 200 char ASCII string """
|
|
url = re.sub(r"^https?://(?:www)?", "", url)
|
|
url = slugify(url, only_ascii=True, lower=True)
|
|
return url[:limit]
|
|
|
|
|
|
def rfc3339todt(rfc3339):
|
|
""" nice dates for humans """
|
|
t = arrow.get(rfc3339).format("YYYY-MM-DD HH:mm ZZZ")
|
|
return str(t)
|
|
|
|
|
|
def extractlicense(url):
|
|
""" extract license name """
|
|
n, e = os.path.splitext(os.path.basename(url))
|
|
return n.upper()
|
|
|
|
|
|
def relurl(text, baseurl=None):
|
|
if not baseurl:
|
|
baseurl = settings.site.url
|
|
for match, standalone, href in RE_MYURL.findall(text):
|
|
needsquotes = False
|
|
if len(href):
|
|
needsquotes = True
|
|
url = href
|
|
else:
|
|
url = standalone
|
|
|
|
r = os.path.relpath(url, baseurl)
|
|
if url.endswith("/") and not r.endswith("/"):
|
|
r = "%s/%s" % (r, settings.filenames.html)
|
|
if needsquotes:
|
|
r = '"%s"' % r
|
|
logger.debug("RELURL: %s => %s (base: %s)", match, r, baseurl)
|
|
text = text.replace(match, r)
|
|
return text
|
|
|
|
|
|
def writepath(fpath, content, mtime=0):
|
|
""" f.write with extras """
|
|
d = os.path.dirname(fpath)
|
|
if not os.path.isdir(d):
|
|
logger.debug("creating directory tree %s", d)
|
|
os.makedirs(d)
|
|
if isinstance(content, str):
|
|
mode = "wt"
|
|
else:
|
|
mode = "wb"
|
|
with open(fpath, mode) as f:
|
|
logger.info("writing file %s", fpath)
|
|
f.write(content)
|
|
if mtime > 0:
|
|
os.utime(fpath, (mtime, mtime))
|
|
|
|
|
|
def maybe_copy(source, target):
|
|
""" copy only if target mtime is smaller, than source mtime """
|
|
if os.path.exists(target) and mtime(source) <= mtime(target):
|
|
return
|
|
logger.info("copying '%s' to '%s'", source, target)
|
|
cp(source, target)
|
|
|
|
|
|
def extractdomain(url):
|
|
url = urlparse(url)
|
|
return url.hostname
|
|
|
|
|
|
J2 = jinja2.Environment(
|
|
loader=jinja2.FileSystemLoader(
|
|
searchpath=settings.paths.get("tmpl")
|
|
),
|
|
lstrip_blocks=True,
|
|
trim_blocks=True,
|
|
)
|
|
J2.filters["relurl"] = relurl
|
|
J2.filters["url2slug"] = url2slug
|
|
J2.filters["printdate"] = rfc3339todt
|
|
J2.filters["extractlicense"] = extractlicense
|
|
J2.filters["extractdomain"] = extractdomain
|
|
|
|
|
|
class cached_property(object):
|
|
""" extermely simple cached_property decorator:
|
|
whenever something is called as @cached_property, on first run, the
|
|
result is calculated, then the class method is overwritten to be
|
|
a property, contaning the result from the method
|
|
"""
|
|
|
|
def __init__(self, method, name=None):
|
|
self.method = method
|
|
self.name = name or method.__name__
|
|
|
|
def __get__(self, inst, cls):
|
|
if inst is None:
|
|
return self
|
|
result = self.method(inst)
|
|
setattr(inst, self.name, result)
|
|
return result
|
|
|
|
|
|
class AQ:
|
|
""" Async queue which starts execution right on population """
|
|
|
|
def __init__(self):
|
|
self.loop = asyncio.get_event_loop()
|
|
self.queue = asyncio.Queue(loop=self.loop)
|
|
|
|
def put(self, task):
|
|
self.queue.put(asyncio.ensure_future(task))
|
|
|
|
async def consume(self):
|
|
while not self.queue.empty():
|
|
item = await self.queue.get()
|
|
self.queue.task_done()
|
|
# asyncio.gather() ?
|
|
|
|
def run(self):
|
|
consumer = asyncio.ensure_future(self.consume())
|
|
self.loop.run_until_complete(consumer)
|
|
|
|
|
|
class Gone(object):
|
|
"""
|
|
Gone object for delete entries
|
|
"""
|
|
|
|
def __init__(self, fpath):
|
|
self.fpath = fpath
|
|
|
|
@property
|
|
def mtime(self):
|
|
return mtime(self.fpath)
|
|
|
|
@property
|
|
def exists(self):
|
|
if (
|
|
os.path.exists(self.renderfile)
|
|
and mtime(self.renderfile) >= self.mtime
|
|
):
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def renderdir(self):
|
|
return os.path.join(settings.paths.get("build"), self.source)
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(self.renderdir, settings.filenames.html)
|
|
|
|
@property
|
|
def source(self):
|
|
source, fext = os.path.splitext(os.path.basename(self.fpath))
|
|
return source
|
|
|
|
@property
|
|
def template(self):
|
|
return "%s.j2.html" % (self.__class__.__name__)
|
|
|
|
@property
|
|
def tmplvars(self):
|
|
return {"source": self.source}
|
|
|
|
async def render(self):
|
|
return
|
|
|
|
class FediverseStats(object):
|
|
def __init__(self, postcount=0, commentcount=0):
|
|
self.postcount = postcount
|
|
self.commentcount = commentcount
|
|
self.nodeinfo_json = "nodeinfo.json"
|
|
|
|
async def render(self):
|
|
writepath(
|
|
os.path.join(
|
|
settings.paths.build, ".well-known", "nodeinfo"
|
|
),
|
|
json.dumps(self.metanodeinfo, indent=4, ensure_ascii=False),
|
|
)
|
|
writepath(
|
|
os.path.join(
|
|
settings.paths.build, ".well-known", self.nodeinfo_json
|
|
),
|
|
json.dumps(self.nodeinfo, indent=4, ensure_ascii=False),
|
|
)
|
|
|
|
@property
|
|
def metanodeinfo(self):
|
|
return {
|
|
"links": [
|
|
{
|
|
"href": f"{settings.site.url}/.well-known/{self.nodeinfo_json}",
|
|
"rel": "http://nodeinfo.diaspora.software/ns/schema/2.0",
|
|
}
|
|
]
|
|
}
|
|
|
|
@property
|
|
def nodeinfo(self):
|
|
return {
|
|
"metadata": {
|
|
"nodeName": settings.site.name,
|
|
"software": {
|
|
"homepage": "https://github.com/petermolnar/nasg",
|
|
"github": "https://github.com/petermolnar/nasg",
|
|
"follow": f"{settings.site.url}",
|
|
},
|
|
"email": "webmaster@petermolnar.net",
|
|
},
|
|
"openRegistrations": False,
|
|
"protocols": ["activitypub"],
|
|
"services": {"inbound": [], "outbound": []},
|
|
"software": {"name": "nasg", "version": "6.6"},
|
|
"usage": {
|
|
"localPosts": self.postcount,
|
|
"localComments": self.commentcount,
|
|
"users": {
|
|
"total": 1,
|
|
"activeHalfyear": 1,
|
|
"activeMonth": 1,
|
|
},
|
|
},
|
|
"version": "2.0",
|
|
}
|
|
|
|
|
|
class Redirect(Gone):
|
|
"""
|
|
Redirect object for entries that moved
|
|
"""
|
|
|
|
@cached_property
|
|
def target(self):
|
|
target = ""
|
|
with open(self.fpath, "rt") as f:
|
|
target = f.read().strip()
|
|
return target
|
|
|
|
@property
|
|
def tmplvars(self):
|
|
return {"source": self.source, "target": self.target}
|
|
|
|
|
|
class MarkdownDoc(object):
|
|
""" Base class for anything that is stored as .md """
|
|
|
|
def __init__(self, fpath):
|
|
self.fpath = fpath
|
|
|
|
@property
|
|
def mtime(self):
|
|
return mtime(self.fpath)
|
|
|
|
@property
|
|
def dt(self):
|
|
""" returns an arrow object; tries to get the published date of the
|
|
markdown doc. The pubdate can be in the future, which is why it's
|
|
done the way it is """
|
|
maybe = arrow.get(self.mtime)
|
|
for key in ["published", "date"]:
|
|
t = self.meta.get(key, None)
|
|
if t and "null" != t:
|
|
try:
|
|
t = arrow.get(t)
|
|
if t.timestamp > maybe.timestamp:
|
|
maybe = t
|
|
except Exception as e:
|
|
logger.error(
|
|
"failed to parse date: %s for key %s in %s",
|
|
t,
|
|
key,
|
|
self.fpath,
|
|
)
|
|
continue
|
|
return maybe
|
|
|
|
@cached_property
|
|
def _parsed(self):
|
|
with open(self.fpath, mode="rt") as f:
|
|
logger.debug("parsing YAML+MD file %s", self.fpath)
|
|
meta, txt = frontmatter.parse(f.read())
|
|
return (meta, txt)
|
|
|
|
@cached_property
|
|
def meta(self):
|
|
return self._parsed[0]
|
|
|
|
@cached_property
|
|
def content(self):
|
|
maybe = self._parsed[1]
|
|
if not maybe or not len(maybe):
|
|
maybe = str("")
|
|
return maybe
|
|
|
|
@cached_property
|
|
def html_content(self):
|
|
if not len(self.content):
|
|
return self.content
|
|
|
|
c = self.content
|
|
if hasattr(self, "images") and len(self.images):
|
|
for match, img in self.images.items():
|
|
if self.is_photo:
|
|
c = c.replace(match, "")
|
|
else:
|
|
c = c.replace(match, str(img))
|
|
c = str(PandocMD2HTML(c))
|
|
c = RE_PRECODE.sub(
|
|
'<pre><code lang="\g<1>" class="language-\g<1>">', c
|
|
)
|
|
return c
|
|
|
|
@cached_property
|
|
def txt_content(self):
|
|
if not len(self.content):
|
|
return ""
|
|
else:
|
|
return PandocMD2TXT(self.content)
|
|
|
|
|
|
class Comment(MarkdownDoc):
|
|
@property
|
|
def dt(self):
|
|
maybe = self.meta.get("date")
|
|
if not maybe or maybe == "null":
|
|
maybe = int(os.path.basename(self.fpath).split("-")[0])
|
|
dt = arrow.get(maybe)
|
|
if self.mtime != dt.timestamp:
|
|
os.utime(self.fpath, (dt.timestamp, dt.timestamp))
|
|
return dt
|
|
|
|
@property
|
|
def source(self):
|
|
return self.meta.get("source")
|
|
|
|
@property
|
|
def author(self):
|
|
r = {
|
|
"@context": "http://schema.org",
|
|
"@type": "Person",
|
|
"name": urlparse(self.source).hostname,
|
|
"url": self.source,
|
|
}
|
|
author = self.meta.get("author")
|
|
if not author:
|
|
return r
|
|
if "name" in author:
|
|
r.update({"name": self.meta.get("author").get("name")})
|
|
elif "url" in author:
|
|
r.update(
|
|
{
|
|
"name": urlparse(
|
|
self.meta.get("author").get("url")
|
|
).hostname
|
|
}
|
|
)
|
|
return r
|
|
|
|
@property
|
|
def type(self):
|
|
return self.meta.get("type", "webmention")
|
|
|
|
@cached_property
|
|
def jsonld(self):
|
|
r = {
|
|
"@context": "http://schema.org",
|
|
"@type": "Comment",
|
|
"author": self.author,
|
|
"url": self.source,
|
|
"discussionUrl": self.meta.get("target"),
|
|
"datePublished": str(self.dt),
|
|
"disambiguatingDescription": self.type,
|
|
}
|
|
return r
|
|
|
|
|
|
class WebImage(object):
|
|
def __init__(self, fpath, mdimg, parent):
|
|
logger.debug("loading image: %s", fpath)
|
|
self.mdimg = mdimg
|
|
self.fpath = fpath
|
|
self.parent = parent
|
|
self.mtime = mtime(self.fpath)
|
|
self.name = os.path.basename(self.fpath)
|
|
self.fname, self.fext = os.path.splitext(self.name)
|
|
self.resized_images = [
|
|
(k, self.Resized(self, k))
|
|
for k in settings.photo.get("sizes").keys()
|
|
if k < max(self.width, self.height)
|
|
]
|
|
if not len(self.resized_images):
|
|
self.resized_images.append(
|
|
(
|
|
max(self.width, self.height),
|
|
self.Resized(self, max(self.width, self.height)),
|
|
)
|
|
)
|
|
|
|
@property
|
|
def is_mainimg(self):
|
|
if self.fname == self.parent.name:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def jsonld(self):
|
|
r = {
|
|
"@context": "http://schema.org",
|
|
"@type": "ImageObject",
|
|
"url": self.href,
|
|
"image": self.href,
|
|
"thumbnail": settings.nameddict(
|
|
{
|
|
"@context": "http://schema.org",
|
|
"@type": "ImageObject",
|
|
"url": self.src,
|
|
"width": self.displayed.width,
|
|
"height": self.displayed.height,
|
|
}
|
|
),
|
|
"name": self.name,
|
|
"encodingFormat": self.mime_type,
|
|
"contentSize": self.mime_size,
|
|
"width": self.linked.width,
|
|
"height": self.linked.height,
|
|
"dateCreated": self.exif.get("CreateDate"),
|
|
"exifData": [],
|
|
"caption": self.caption,
|
|
"headline": self.title,
|
|
"representativeOfPage": False,
|
|
# "text": str(self)
|
|
}
|
|
for k, v in self.exif.items():
|
|
r["exifData"].append(
|
|
{"@type": "PropertyValue", "name": k, "value": v}
|
|
)
|
|
if self.is_photo:
|
|
licence = settings.licence["_default"]
|
|
r.update(
|
|
{
|
|
"creator": settings.author,
|
|
"copyrightHolder": settings.author,
|
|
"license": f"https://spdx.org/licenses/{licence}.html",
|
|
}
|
|
)
|
|
if self.is_mainimg:
|
|
r.update({"representativeOfPage": True})
|
|
|
|
if (
|
|
self.exif["GPSLatitude"] != 0
|
|
and self.exif["GPSLongitude"] != 0
|
|
):
|
|
r.update(
|
|
{
|
|
"locationCreated": settings.nameddict(
|
|
{
|
|
"@context": "http://schema.org",
|
|
"@type": "Place",
|
|
"geo": settings.nameddict(
|
|
{
|
|
"@context": "http://schema.org",
|
|
"@type": "GeoCoordinates",
|
|
"latitude": round(
|
|
self.exif["GPSLatitude"], 4
|
|
),
|
|
"longitude": round(
|
|
self.exif["GPSLongitude"], 4
|
|
),
|
|
}
|
|
),
|
|
}
|
|
)
|
|
}
|
|
)
|
|
return settings.nameddict(r)
|
|
|
|
def __str__(self):
|
|
if len(self.mdimg.css):
|
|
return self.mdimg.match
|
|
tmpl = J2.get_template("%s.j2.html" % (self.__class__.__name__))
|
|
return tmpl.render(self.jsonld)
|
|
|
|
@cached_property
|
|
def meta(self):
|
|
return Exif(self.fpath)
|
|
|
|
@property
|
|
def caption(self):
|
|
if len(self.mdimg.alt):
|
|
return self.mdimg.alt
|
|
else:
|
|
return self.meta.get("Description", "")
|
|
|
|
@property
|
|
def title(self):
|
|
if len(self.mdimg.title):
|
|
return self.mdimg.title
|
|
else:
|
|
return self.meta.get("Headline", self.fname)
|
|
|
|
@property
|
|
def tags(self):
|
|
return list(set(self.meta.get("Subject", [])))
|
|
|
|
@property
|
|
def published(self):
|
|
return arrow.get(
|
|
self.meta.get("ReleaseDate", self.meta.get("ModifyDate"))
|
|
)
|
|
|
|
@property
|
|
def width(self):
|
|
return int(self.meta.get("ImageWidth"))
|
|
|
|
@property
|
|
def height(self):
|
|
return int(self.meta.get("ImageHeight"))
|
|
|
|
@property
|
|
def mime_type(self):
|
|
return str(self.meta.get("MIMEType", "image/jpeg"))
|
|
|
|
@property
|
|
def mime_size(self):
|
|
try:
|
|
size = os.path.getsize(self.linked.fpath)
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to get mime size of %s", self.linked.fpath
|
|
)
|
|
size = self.meta.get("FileSize", 0)
|
|
return size
|
|
|
|
@property
|
|
def displayed(self):
|
|
ret = self.resized_images[0][1]
|
|
for size, r in self.resized_images:
|
|
if size == settings.photo.get("default"):
|
|
ret = r
|
|
return ret
|
|
|
|
@property
|
|
def linked(self):
|
|
m = 0
|
|
ret = self.resized_images[0][1]
|
|
for size, r in self.resized_images:
|
|
if size > m:
|
|
m = size
|
|
ret = r
|
|
return ret
|
|
|
|
@property
|
|
def src(self):
|
|
return self.displayed.url
|
|
|
|
@property
|
|
def href(self):
|
|
return self.linked.url
|
|
|
|
@property
|
|
def is_photo(self):
|
|
r = settings.photo.get("re_author", None)
|
|
if not r:
|
|
return False
|
|
cpr = self.meta.get("Copyright", "")
|
|
art = self.meta.get("Artist", "")
|
|
# both Artist and Copyright missing from EXIF
|
|
if not cpr and not art:
|
|
return False
|
|
# we have regex, Artist and Copyright, try matching them
|
|
if r.search(cpr) or r.search(art):
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def exif(self):
|
|
exif = {
|
|
"Model": "",
|
|
"FNumber": "",
|
|
"ExposureTime": "",
|
|
"FocalLength": "",
|
|
"ISO": "",
|
|
"LensID": "",
|
|
"CreateDate": str(arrow.get(self.mtime)),
|
|
"GPSLatitude": 0,
|
|
"GPSLongitude": 0,
|
|
}
|
|
if not self.is_photo:
|
|
return exif
|
|
|
|
mapping = {
|
|
"Model": ["Model"],
|
|
"FNumber": ["FNumber", "Aperture"],
|
|
"ExposureTime": ["ExposureTime"],
|
|
"FocalLength": ["FocalLength"],
|
|
"ISO": ["ISO"],
|
|
"LensID": ["LensID", "LensSpec", "Lens"],
|
|
"CreateDate": ["CreateDate", "DateTimeOriginal"],
|
|
"GPSLatitude": ["GPSLatitude"],
|
|
"GPSLongitude": ["GPSLongitude"],
|
|
}
|
|
|
|
for ekey, candidates in mapping.items():
|
|
for candidate in candidates:
|
|
maybe = self.meta.get(candidate, None)
|
|
if not maybe:
|
|
continue
|
|
else:
|
|
exif[ekey] = maybe
|
|
break
|
|
return settings.nameddict(exif)
|
|
|
|
def _maybe_watermark(self, img):
|
|
if not self.is_photo:
|
|
return img
|
|
|
|
wmarkfile = settings.paths.get("watermark")
|
|
if not os.path.exists(wmarkfile):
|
|
return img
|
|
|
|
with wand.image.Image(filename=wmarkfile) as wmark:
|
|
w = self.height * 0.2
|
|
h = wmark.height * (w / wmark.width)
|
|
if self.width > self.height:
|
|
x = self.width - w - (self.width * 0.01)
|
|
y = self.height - h - (self.height * 0.01)
|
|
else:
|
|
x = self.width - h - (self.width * 0.01)
|
|
y = self.height - w - (self.height * 0.01)
|
|
|
|
w = round(w)
|
|
h = round(h)
|
|
x = round(x)
|
|
y = round(y)
|
|
|
|
wmark.resize(w, h)
|
|
if self.width <= self.height:
|
|
wmark.rotate(-90)
|
|
img.composite(image=wmark, left=x, top=y)
|
|
return img
|
|
|
|
async def downsize(self):
|
|
need = False
|
|
for size, resized in self.resized_images:
|
|
if not resized.exists or settings.args.get("regenerate"):
|
|
need = True
|
|
break
|
|
if not need:
|
|
return
|
|
|
|
with wand.image.Image(filename=self.fpath) as img:
|
|
img.auto_orient()
|
|
img = self._maybe_watermark(img)
|
|
for size, resized in self.resized_images:
|
|
if not resized.exists or settings.args.get(
|
|
"regenerate"
|
|
):
|
|
logger.info(
|
|
"resizing image: %s to size %d",
|
|
os.path.basename(self.fpath),
|
|
size,
|
|
)
|
|
await resized.make(img)
|
|
|
|
class Resized:
|
|
def __init__(self, parent, size, crop=False):
|
|
self.parent = parent
|
|
self.size = size
|
|
self.crop = crop
|
|
|
|
@property
|
|
def suffix(self):
|
|
return settings.photo.get("sizes").get(self.size, "")
|
|
|
|
@property
|
|
def fname(self):
|
|
return "%s%s%s" % (
|
|
self.parent.fname,
|
|
self.suffix,
|
|
self.parent.fext,
|
|
)
|
|
|
|
@property
|
|
def fpath(self):
|
|
return os.path.join(
|
|
self.parent.parent.renderdir, self.fname
|
|
)
|
|
|
|
@property
|
|
def url(self):
|
|
return "%s/%s/%s" % (
|
|
settings.site.get("url"),
|
|
self.parent.parent.name,
|
|
"%s%s%s"
|
|
% (self.parent.fname, self.suffix, self.parent.fext),
|
|
)
|
|
|
|
@property
|
|
def relpath(self):
|
|
return "%s/%s" % (
|
|
self.parent.parent.renderdir.replace(
|
|
settings.paths.get("build"), ""
|
|
),
|
|
self.fname,
|
|
)
|
|
|
|
@property
|
|
def exists(self):
|
|
if os.path.isfile(self.fpath):
|
|
if mtime(self.fpath) >= self.parent.mtime:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def width(self):
|
|
return self.dimensions[0]
|
|
|
|
@property
|
|
def height(self):
|
|
return self.dimensions[1]
|
|
|
|
@property
|
|
def dimensions(self):
|
|
width = self.parent.width
|
|
height = self.parent.height
|
|
size = self.size
|
|
|
|
ratio = max(width, height) / min(width, height)
|
|
horizontal = True if (width / height) >= 1 else False
|
|
|
|
# panorama: reverse "horizontal" because the limit should be on
|
|
# the shorter side, not the longer, and make it a bit smaller, than
|
|
# the actual limit
|
|
# 2.39 is the wide angle cinematic view: anything wider, than that
|
|
# is panorama land
|
|
if ratio > 2.4 and not self.crop:
|
|
size = int(size * 0.6)
|
|
horizontal = not horizontal
|
|
|
|
if (horizontal and not self.crop) or (
|
|
not horizontal and self.crop
|
|
):
|
|
w = size
|
|
h = int(float(size / width) * height)
|
|
else:
|
|
h = size
|
|
w = int(float(size / height) * width)
|
|
return (w, h)
|
|
|
|
async def make(self, original):
|
|
if not os.path.isdir(os.path.dirname(self.fpath)):
|
|
os.makedirs(os.path.dirname(self.fpath))
|
|
|
|
with original.clone() as thumb:
|
|
thumb.resize(self.width, self.height)
|
|
|
|
if self.crop:
|
|
thumb.liquid_rescale(self.size, self.size, 1, 1)
|
|
|
|
if (
|
|
self.parent.meta.get("FileType", "jpeg").lower()
|
|
== "jpeg"
|
|
):
|
|
thumb.compression_quality = 88
|
|
thumb.unsharp_mask(
|
|
radius=1, sigma=0.5, amount=0.7, threshold=0.5
|
|
)
|
|
thumb.format = "pjpeg"
|
|
|
|
# this is to make sure pjpeg happens
|
|
with open(self.fpath, "wb") as f:
|
|
logger.info("writing %s", self.fpath)
|
|
thumb.save(file=f)
|
|
|
|
|
|
class Singular(MarkdownDoc):
|
|
"""
|
|
A Singular object: a complete representation of a post, including
|
|
all it's comments, files, images, etc
|
|
"""
|
|
|
|
def __init__(self, fpath):
|
|
self.fpath = fpath
|
|
self.dirpath = os.path.dirname(fpath)
|
|
self.name = os.path.basename(self.dirpath)
|
|
self.category = os.path.basename(os.path.dirname(self.dirpath))
|
|
|
|
@cached_property
|
|
def files(self):
|
|
"""
|
|
An array of files present at the same directory level as
|
|
the Singular object, excluding hidden (starting with .) and markdown
|
|
(ending with .md) files
|
|
"""
|
|
return [
|
|
k
|
|
for k in glob.glob(os.path.join(self.dirpath, "*.*"))
|
|
if not k.startswith(".")
|
|
]
|
|
|
|
@cached_property
|
|
def comments(self):
|
|
"""
|
|
An dict of Comment objects keyed with their path, populated from the
|
|
same directory level as the Singular objects
|
|
"""
|
|
comments = {}
|
|
canditates = glob.glob(os.path.join(self.dirpath, "*.md"))
|
|
for candidate in canditates:
|
|
if os.path.basename(candidate) == settings.filenames.md:
|
|
continue
|
|
if candidate.startswith("."):
|
|
continue
|
|
comment = Comment(candidate)
|
|
comments[comment.dt.timestamp] = comment
|
|
return comments
|
|
|
|
@cached_property
|
|
def images(self):
|
|
"""
|
|
A dict of WebImage objects, populated by:
|
|
- images that are present in the Markdown content
|
|
- and have an actual image file at the same directory level as
|
|
the Singular object
|
|
"""
|
|
images = {}
|
|
for match, alt, fname, title, css in RE_MDIMG.findall(
|
|
self.content
|
|
):
|
|
mdimg = MarkdownImage(match, alt, fname, title, css)
|
|
imgpath = os.path.join(self.dirpath, fname)
|
|
if imgpath in self.files:
|
|
kind = filetype.guess(imgpath)
|
|
if kind and "image" in kind.mime.lower():
|
|
images.update(
|
|
{match: WebImage(imgpath, mdimg, self)}
|
|
)
|
|
else:
|
|
logger.error(
|
|
"Missing image: %s, referenced in %s",
|
|
imgpath,
|
|
self.fpath,
|
|
)
|
|
continue
|
|
return images
|
|
|
|
@property
|
|
def summary(self):
|
|
return str(self.meta.get("summary", ""))
|
|
|
|
@cached_property
|
|
def html_summary(self):
|
|
if not len(self.summary):
|
|
return ""
|
|
else:
|
|
return PandocMD2HTML(self.summary)
|
|
|
|
@cached_property
|
|
def txt_summary(self):
|
|
if not len(self.summary):
|
|
return ""
|
|
else:
|
|
return PandocMD2TXT(self.summary)
|
|
|
|
@property
|
|
def published(self):
|
|
# ok, so here's a hack: because I have no idea when my older photos
|
|
# were actually published, any photo from before 2014 will have
|
|
# the EXIF createdate as publish date
|
|
pub = arrow.get(self.meta.get("published"))
|
|
if self.is_photo:
|
|
maybe = arrow.get(self.photo.exif.get("CreateDate"))
|
|
if maybe.year < settings.photo.earlyyears:
|
|
pub = maybe
|
|
return pub
|
|
|
|
@property
|
|
def updated(self):
|
|
if "updated" in self.meta:
|
|
return arrow.get(self.meta.get("updated"))
|
|
else:
|
|
return self.dt
|
|
|
|
@property
|
|
def sameas(self):
|
|
r = {}
|
|
for k in glob.glob(os.path.join(self.dirpath, "*.copy")):
|
|
with open(k, "rt") as f:
|
|
r.update({f.read(): True})
|
|
return list(r.keys())
|
|
|
|
@property
|
|
def is_page(self):
|
|
""" all the categories starting with _ are pages """
|
|
if self.category.startswith("_"):
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def is_front(self):
|
|
if self.is_reply:
|
|
return False
|
|
# if self.category in settings.notinfeed:
|
|
# return False
|
|
return True
|
|
|
|
@property
|
|
def is_photo(self):
|
|
"""
|
|
This is true if there is a file, with the same name as the entry's
|
|
directory - so, it's slug -, and that that image believes it's a a
|
|
photo.
|
|
"""
|
|
if len(self.images) != 1:
|
|
return False
|
|
photo = next(iter(self.images.values()))
|
|
maybe = self.fpath.replace(
|
|
settings.filenames.md, "%s.jpg" % (self.name)
|
|
)
|
|
if photo.fpath == maybe:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def is_reply(self):
|
|
return self.meta.get("in-reply-to", False)
|
|
|
|
@property
|
|
def is_future(self):
|
|
if self.published.timestamp > arrow.utcnow().timestamp:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def photo(self):
|
|
if not self.is_photo:
|
|
return None
|
|
return next(iter(self.images.values()))
|
|
|
|
@property
|
|
def title(self):
|
|
if self.is_reply:
|
|
return "RE: %s" % self.is_reply
|
|
return self.meta.get(
|
|
"title", self.published.format(settings.displaydate)
|
|
)
|
|
|
|
@property
|
|
def tags(self):
|
|
return self.meta.get("tags", [])
|
|
|
|
def baseN(
|
|
self, num, b=36, numerals="0123456789abcdefghijklmnopqrstuvwxyz"
|
|
):
|
|
"""
|
|
Creates short, lowercase slug for a number (an epoch) passed
|
|
"""
|
|
num = int(num)
|
|
return ((num == 0) and numerals[0]) or (
|
|
self.baseN(num // b, b, numerals).lstrip(numerals[0])
|
|
+ numerals[num % b]
|
|
)
|
|
|
|
@property
|
|
def shortslug(self):
|
|
return self.baseN(self.published.timestamp)
|
|
|
|
@property
|
|
def to_syndicate(self):
|
|
urls = self.meta.get("syndicate", [])
|
|
if not self.is_page:
|
|
urls.append("https://fed.brid.gy/")
|
|
# urls.append("https://brid.gy/publish/mastodon")
|
|
if self.is_photo:
|
|
urls.append("https://brid.gy/publish/flickr")
|
|
return urls
|
|
|
|
@property
|
|
def to_ping(self):
|
|
webmentions = []
|
|
for url in self.to_syndicate:
|
|
w = Webmention(
|
|
self.url,
|
|
url,
|
|
os.path.dirname(self.fpath),
|
|
self.dt.timestamp,
|
|
)
|
|
webmentions.append(w)
|
|
if self.is_reply:
|
|
w = Webmention(
|
|
self.url,
|
|
self.is_reply,
|
|
os.path.dirname(self.fpath),
|
|
self.dt.timestamp,
|
|
)
|
|
webmentions.append(w)
|
|
return webmentions
|
|
|
|
@property
|
|
def licence(self):
|
|
k = "_default"
|
|
if self.category in settings.licence:
|
|
k = self.category
|
|
return settings.licence[k]
|
|
|
|
@property
|
|
def lang(self):
|
|
lang = "en"
|
|
try:
|
|
lang = langdetect.detect(
|
|
"\n".join([self.meta.get("title", ""), self.content])
|
|
)
|
|
except BaseException:
|
|
pass
|
|
return lang
|
|
|
|
@property
|
|
def url(self):
|
|
return "%s/%s/" % (settings.site.get("url"), self.name)
|
|
|
|
@property
|
|
def has_code(self):
|
|
if RE_CODE.search(self.content):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@cached_property
|
|
def review(self):
|
|
if "review" not in self.meta:
|
|
return False
|
|
review = self.meta.get("review")
|
|
rated, outof = review.get("rating").split("/")
|
|
r = {
|
|
"@context": "https://schema.org/",
|
|
"@type": "Review",
|
|
"reviewRating": {
|
|
"@type": "Rating",
|
|
"@context": "http://schema.org",
|
|
"ratingValue": rated,
|
|
"bestRating": outof,
|
|
"worstRating": 1,
|
|
},
|
|
"name": review.get("title"),
|
|
"text": review.get("summary"),
|
|
"url": review.get("url"),
|
|
"author": settings.author,
|
|
}
|
|
return r
|
|
|
|
@cached_property
|
|
def event(self):
|
|
if "event" not in self.meta:
|
|
return False
|
|
event = self.meta.get("event", {})
|
|
r = {
|
|
"@context": "http://schema.org",
|
|
"@type": "Event",
|
|
"endDate": str(arrow.get(event.get("end"))),
|
|
"startDate": str(arrow.get(event.get("start"))),
|
|
"location": {
|
|
"@context": "http://schema.org",
|
|
"@type": "Place",
|
|
"address": event.get("location"),
|
|
"name": event.get("location"),
|
|
},
|
|
"name": self.title,
|
|
}
|
|
return r
|
|
|
|
@cached_property
|
|
def jsonld(self):
|
|
r = {
|
|
"@context": "http://schema.org",
|
|
"@type": "Article",
|
|
"@id": self.url,
|
|
"inLanguage": self.lang,
|
|
"headline": self.title,
|
|
"url": self.url,
|
|
"genre": self.category,
|
|
"mainEntityOfPage": f"{self.url}#article",
|
|
"dateModified": str(self.dt),
|
|
"datePublished": str(self.published),
|
|
"copyrightYear": str(self.published.format("YYYY")),
|
|
"license": f"https://spdx.org/licenses/{self.licence}.html",
|
|
"image": settings.site.image,
|
|
"author": settings.author,
|
|
"sameAs": self.sameas,
|
|
"publisher": settings.site.publisher,
|
|
"name": self.name,
|
|
"text": self.html_content,
|
|
"description": self.html_summary,
|
|
"potentialAction": [],
|
|
"comment": [],
|
|
"commentCount": len(self.comments.keys()),
|
|
"keywords": self.tags,
|
|
}
|
|
|
|
if self.is_photo:
|
|
r.update({"@type": "Photograph"})
|
|
elif self.has_code:
|
|
r.update({"@type": "TechArticle"})
|
|
elif self.is_page:
|
|
r.update({"@type": "WebPage"})
|
|
if len(self.images):
|
|
r["image"] = []
|
|
for img in list(self.images.values()):
|
|
imgld = img.jsonld
|
|
imgld["text"] = str(img)
|
|
r["image"].append(imgld)
|
|
|
|
if self.is_reply:
|
|
r.update(
|
|
{
|
|
"mentions": {
|
|
"@context": "http://schema.org",
|
|
"@type": "Thing",
|
|
"url": self.is_reply,
|
|
}
|
|
}
|
|
)
|
|
|
|
if self.review:
|
|
r.update({"review": self.review})
|
|
|
|
if self.event:
|
|
r.update({"subjectOf": self.event})
|
|
|
|
for url in list(set(self.to_syndicate)):
|
|
r["potentialAction"].append(
|
|
{
|
|
"@context": "http://schema.org",
|
|
"@type": "InteractAction",
|
|
"target": url,
|
|
}
|
|
)
|
|
|
|
for mtime in sorted(self.comments.keys()):
|
|
r["comment"].append(self.comments[mtime].jsonld)
|
|
|
|
return settings.nameddict(r)
|
|
|
|
@property
|
|
def template(self):
|
|
return f"{self.__class__.__name__}.j2.html"
|
|
|
|
@property
|
|
def txttemplate(self):
|
|
return f"{self.__class__.__name__}.j2.txt"
|
|
|
|
@property
|
|
def renderdir(self):
|
|
return os.path.join(settings.paths.get("build"), self.name)
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(self.renderdir, settings.filenames.html)
|
|
|
|
@property
|
|
def txtfile(self):
|
|
return os.path.join(self.renderdir, settings.filenames.txt)
|
|
|
|
@property
|
|
def exists(self):
|
|
if settings.args.get("force"):
|
|
logger.debug("rendering required: force mode on")
|
|
return False
|
|
maybe = self.dt.timestamp
|
|
if len(self.files):
|
|
for f in self.files:
|
|
maybe = max(maybe, mtime(f))
|
|
for f in [self.renderfile, self.txtfile]:
|
|
if not os.path.exists(f):
|
|
logger.debug(f"rendering required: no {f} yet")
|
|
return False
|
|
elif maybe > mtime(f):
|
|
logger.debug(f"rendering required: self.dt > {f} mtime")
|
|
return False
|
|
logger.debug("rendering not required")
|
|
return True
|
|
|
|
@property
|
|
def corpus(self):
|
|
return "\n".join(
|
|
[self.title, self.name, self.summary, self.content]
|
|
)
|
|
|
|
async def copy_files(self):
|
|
exclude = [
|
|
".md",
|
|
".jpg",
|
|
".png",
|
|
".gif",
|
|
".ping",
|
|
".url",
|
|
".del",
|
|
".copy",
|
|
".cache",
|
|
]
|
|
include = ["map.png"]
|
|
files = glob.glob(
|
|
os.path.join(os.path.dirname(self.fpath), "*.*")
|
|
)
|
|
for f in files:
|
|
fname = os.path.basename(f)
|
|
fbasename, fext = os.path.splitext(fname)
|
|
if fext.lower() in exclude and fname.lower() not in include:
|
|
continue
|
|
|
|
t = os.path.join(
|
|
settings.paths.get("build"), self.name, fname
|
|
)
|
|
if os.path.exists(t) and mtime(f) <= mtime(t):
|
|
continue
|
|
logger.info("copying '%s' to '%s'", f, t)
|
|
cp(f, t)
|
|
|
|
async def render_map(self):
|
|
if not self.is_photo:
|
|
return
|
|
|
|
if "locationCreated" not in self.photo.jsonld:
|
|
return
|
|
|
|
style = settings.mapbox.style
|
|
size = settings.mapbox.size
|
|
lat = round(
|
|
float(
|
|
self.photo.jsonld["locationCreated"]["geo"]["latitude"]
|
|
),
|
|
3,
|
|
)
|
|
lon = round(
|
|
float(
|
|
self.photo.jsonld["locationCreated"]["geo"]["longitude"]
|
|
),
|
|
3,
|
|
)
|
|
token = keys.mapbox.get("private")
|
|
mapfpath = os.path.join(self.dirpath, "map.png")
|
|
if os.path.exists(mapfpath):
|
|
return
|
|
|
|
url = f"https://api.mapbox.com/styles/v1/mapbox/{style}/static/pin-s({lon},{lat})/{lon},{lat},11,20/{size}?access_token={token}"
|
|
logger.info("requesting map for %s with URL %s", self.name, url)
|
|
with requests.get(url, stream=True) as r:
|
|
with open(mapfpath, "wb") as f:
|
|
copyfileobj(r.raw, f)
|
|
|
|
@property
|
|
def has_archive(self):
|
|
return len(
|
|
glob.glob(os.path.join(self.dirpath, f"*archiveorg*.copy"))
|
|
)
|
|
|
|
async def get_from_archiveorg(self):
|
|
if self.has_archive:
|
|
return
|
|
if self.is_future:
|
|
return
|
|
if (
|
|
self.published.timestamp + 86400
|
|
) > arrow.utcnow().timestamp:
|
|
return
|
|
logger.info("archive.org .copy is missing for %s", self.name)
|
|
if len(self.category) and not (
|
|
settings.args.get("noservices")
|
|
or settings.args.get("offline")
|
|
):
|
|
wb = wayback.FindWaybackURL(self.name, self.category)
|
|
wb.run()
|
|
if len(wb.oldest):
|
|
archiveurl = url2slug(wb.oldest)
|
|
t = os.path.join(self.dirpath, f"{archiveurl}.copy")
|
|
writepath(t, wb.oldest)
|
|
del wb
|
|
|
|
async def render(self):
|
|
await self.get_from_archiveorg()
|
|
await self.render_map()
|
|
|
|
if self.exists:
|
|
return True
|
|
|
|
logger.info("rendering %s", self.name)
|
|
v = {
|
|
"baseurl": self.url,
|
|
"post": self.jsonld,
|
|
"site": settings.site,
|
|
"menu": settings.menu,
|
|
"meta": settings.meta,
|
|
"fnames": settings.filenames,
|
|
}
|
|
writepath(
|
|
self.renderfile, J2.get_template(self.template).render(v)
|
|
)
|
|
del v
|
|
|
|
g = {
|
|
"post": self.jsonld,
|
|
"summary": self.txt_summary,
|
|
"content": self.txt_content,
|
|
}
|
|
writepath(
|
|
self.txtfile, J2.get_template(self.txttemplate).render(g)
|
|
)
|
|
del g
|
|
|
|
|
|
class Home(Singular):
|
|
def __init__(self, fpath):
|
|
super().__init__(fpath)
|
|
self.cdata = {}
|
|
self.pdata = {}
|
|
|
|
def add(self, category, post):
|
|
if not len(category.name):
|
|
return
|
|
|
|
if category.name not in self.cdata:
|
|
self.cdata[category.name] = category
|
|
|
|
if category.name not in self.pdata:
|
|
self.pdata[category.name] = post
|
|
else:
|
|
current = arrow.get(self.pdata[category.name].datePublished)
|
|
if current > post.published:
|
|
return
|
|
else:
|
|
self.pdata[category.name] = post
|
|
return
|
|
|
|
@property
|
|
def posts(self):
|
|
flattened = []
|
|
order = {}
|
|
for cname, post in self.pdata.items():
|
|
order[post.published.timestamp] = cname
|
|
|
|
for mtime in sorted(order.keys(), reverse=True):
|
|
category = self.cdata[order[mtime]].ctmplvars
|
|
post = self.pdata[order[mtime]].jsonld
|
|
flattened.append((category, post))
|
|
|
|
return flattened
|
|
|
|
@property
|
|
def renderdir(self):
|
|
return settings.paths.get("build")
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(
|
|
settings.paths.get("build"), settings.filenames.html
|
|
)
|
|
|
|
@property
|
|
def dt(self):
|
|
ts = 0
|
|
for cat, post in self.posts:
|
|
ts = max(ts, arrow.get(post["dateModified"]).timestamp)
|
|
return arrow.get(ts)
|
|
|
|
async def render_gopher(self):
|
|
lines = ["%s's gopherhole" % (settings.site.name), "", ""]
|
|
|
|
for category, post in self.posts:
|
|
line = "1%s\t/%s/%s\t%s\t70" % (
|
|
category["name"],
|
|
settings.paths.category,
|
|
category["name"],
|
|
settings.site.name,
|
|
)
|
|
lines.append(line)
|
|
lines.append("")
|
|
writepath(
|
|
self.renderfile.replace(
|
|
settings.filenames.html, settings.filenames.gopher
|
|
),
|
|
"\r\n".join(lines),
|
|
)
|
|
|
|
async def render(self):
|
|
if self.exists:
|
|
return
|
|
logger.info("rendering %s", self.name)
|
|
r = J2.get_template(self.template).render(
|
|
{
|
|
"baseurl": settings.site.get("url"),
|
|
"post": self.jsonld,
|
|
"site": settings.site,
|
|
"menu": settings.menu,
|
|
"meta": settings.meta,
|
|
"posts": self.posts,
|
|
"fnames": settings.filenames,
|
|
}
|
|
)
|
|
writepath(self.renderfile, r)
|
|
await self.render_gopher()
|
|
|
|
|
|
class PHPFile(object):
|
|
@property
|
|
def exists(self):
|
|
if settings.args.get("force"):
|
|
return False
|
|
if not os.path.exists(self.renderfile):
|
|
return False
|
|
if self.mtime > mtime(self.renderfile):
|
|
return False
|
|
return True
|
|
|
|
@property
|
|
def mtime(self):
|
|
return mtime(
|
|
os.path.join(settings.paths.get("tmpl"), self.templatefile)
|
|
)
|
|
|
|
@property
|
|
def renderfile(self):
|
|
raise ValueError("Not implemented")
|
|
|
|
@property
|
|
def templatefile(self):
|
|
raise ValueError("Not implemented")
|
|
|
|
async def render(self):
|
|
# if self.exists:
|
|
# return
|
|
await self._render()
|
|
|
|
|
|
class Search(PHPFile):
|
|
def __init__(self):
|
|
self.fpath = os.path.join(
|
|
settings.paths.get("build"), "search.sqlite"
|
|
)
|
|
self.db = sqlite3.connect(self.fpath)
|
|
self.db.execute("PRAGMA auto_vacuum = INCREMENTAL;")
|
|
self.db.execute("PRAGMA journal_mode = MEMORY;")
|
|
self.db.execute("PRAGMA temp_store = MEMORY;")
|
|
self.db.execute("PRAGMA locking_mode = NORMAL;")
|
|
self.db.execute("PRAGMA synchronous = FULL;")
|
|
self.db.execute('PRAGMA encoding = "UTF-8";')
|
|
self.db.execute(
|
|
"""
|
|
CREATE VIRTUAL TABLE IF NOT EXISTS data USING fts4(
|
|
url,
|
|
mtime,
|
|
name,
|
|
title,
|
|
category,
|
|
content,
|
|
notindexed=category,
|
|
notindexed=url,
|
|
notindexed=mtime,
|
|
tokenize=porter
|
|
)"""
|
|
)
|
|
self.is_changed = False
|
|
|
|
def __exit__(self):
|
|
if self.is_changed:
|
|
self.db.commit()
|
|
self.db.execute("PRAGMA auto_vacuum;")
|
|
self.db.close()
|
|
|
|
def check(self, name):
|
|
ret = 0
|
|
maybe = self.db.execute(
|
|
"""
|
|
SELECT
|
|
mtime
|
|
FROM
|
|
data
|
|
WHERE
|
|
name = ?
|
|
""",
|
|
(name,),
|
|
).fetchone()
|
|
if maybe:
|
|
ret = int(maybe[0])
|
|
return ret
|
|
|
|
def append(self, post):
|
|
mtime = int(post.published.timestamp)
|
|
check = self.check(post.name)
|
|
if check and check < mtime:
|
|
self.db.execute(
|
|
"""
|
|
DELETE
|
|
FROM
|
|
data
|
|
WHERE
|
|
name=?""",
|
|
(post.name,),
|
|
)
|
|
check = False
|
|
if not check:
|
|
self.db.execute(
|
|
"""
|
|
INSERT INTO
|
|
data
|
|
(url, mtime, name, title, category, content)
|
|
VALUES
|
|
(?,?,?,?,?,?);
|
|
""",
|
|
(
|
|
post.url,
|
|
mtime,
|
|
post.name,
|
|
post.title,
|
|
post.category,
|
|
post.content,
|
|
),
|
|
)
|
|
self.is_changed = True
|
|
|
|
@property
|
|
def templates(self):
|
|
return ["Search.j2.php", "OpenSearch.j2.xml"]
|
|
|
|
async def _render(self):
|
|
for template in self.templates:
|
|
r = J2.get_template(template).render(
|
|
{
|
|
"post": {},
|
|
"site": settings.site,
|
|
"menu": settings.menu,
|
|
"meta": settings.meta,
|
|
}
|
|
)
|
|
target = os.path.join(
|
|
settings.paths.get("build"),
|
|
template.replace(".j2", "").lower(),
|
|
)
|
|
writepath(target, r)
|
|
|
|
|
|
class IndexPHP(PHPFile):
|
|
def __init__(self):
|
|
self.gone = {}
|
|
self.redirect = {}
|
|
|
|
def add_gone(self, uri):
|
|
self.gone[uri] = True
|
|
|
|
def add_redirect(self, source, target):
|
|
if target in self.gone:
|
|
self.add_gone(source)
|
|
else:
|
|
if "://" not in target:
|
|
target = "%s/%s" % (settings.site.get("url"), target)
|
|
self.redirect[source] = target
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(settings.paths.get("build"), "index.php")
|
|
|
|
@property
|
|
def templatefile(self):
|
|
return "404.j2.php"
|
|
|
|
async def _render(self):
|
|
r = J2.get_template(self.templatefile).render(
|
|
{
|
|
"post": {},
|
|
"site": settings.site,
|
|
"menu": settings.menu,
|
|
"gones": self.gone,
|
|
"redirects": self.redirect,
|
|
"rewrites": settings.rewrites,
|
|
"gone_re": settings.gones,
|
|
}
|
|
)
|
|
writepath(self.renderfile, r)
|
|
|
|
|
|
class Micropub(PHPFile):
|
|
def __init__(self):
|
|
self.collected_tags = {}
|
|
|
|
@property
|
|
def tags(self):
|
|
return list(self.collected_tags.keys())
|
|
|
|
def add_tag(self, tag):
|
|
self.add_tags([tag])
|
|
|
|
def add_tags(self, tags):
|
|
for tag in tags:
|
|
self.collected_tags.update({tag: True})
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(settings.paths.get("build"), "micropub.php")
|
|
|
|
@property
|
|
def templatefile(self):
|
|
return "%s.j2.php" % (self.__class__.__name__)
|
|
|
|
async def _render(self):
|
|
r = J2.get_template(self.templatefile).render(
|
|
{
|
|
"wallabag": keys.wallabag,
|
|
"site": settings.site,
|
|
"paths": settings.paths,
|
|
"tags": {"tags": self.tags},
|
|
}
|
|
)
|
|
writepath(self.renderfile, r)
|
|
|
|
|
|
class WorldMap(object):
|
|
def __init__(self):
|
|
self.data = {}
|
|
self.mtime = 0
|
|
|
|
def add(self, post):
|
|
if not post.is_photo:
|
|
return
|
|
if not post.photo.jsonld.locationCreated:
|
|
return
|
|
|
|
lat = post.photo.jsonld.locationCreated.geo.latitude
|
|
lon = post.photo.jsonld.locationCreated.geo.longitude
|
|
k = (lat, lon)
|
|
content = f'<p><a href="{post.url}"><img src="{post.photo.src}" style="width: 150px; height: auto" /><br />{post.title}</a></p>'
|
|
# d = {"latitude": nlat, "longitude": nlon, "popup": content}
|
|
if k in self.data:
|
|
self.data[k].append(content)
|
|
else:
|
|
self.data[k] = [content]
|
|
self.mtime = max(post.dt.timestamp, self.mtime)
|
|
|
|
@property
|
|
def exists(self):
|
|
if settings.args.get("force"):
|
|
return False
|
|
if not os.path.exists(self.renderfile):
|
|
return False
|
|
if mtime(self.renderfile) >= self.mtime:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(
|
|
settings.paths.get("build"), settings.filenames.worldmap
|
|
)
|
|
|
|
@property
|
|
def template(self):
|
|
return "%s.j2.html" % (self.__class__.__name__)
|
|
|
|
@property
|
|
def tmplvars(self):
|
|
return {
|
|
"token": keys.mapbox.get(settings.site.name),
|
|
"site": settings.site,
|
|
"menu": settings.menu,
|
|
"meta": settings.meta,
|
|
"site": settings.site,
|
|
"geo": self.data,
|
|
}
|
|
|
|
async def render(self):
|
|
if self.exists:
|
|
return
|
|
logger.info(
|
|
"rendering %s to %s", self.__class__, self.renderfile
|
|
)
|
|
writepath(
|
|
self.renderfile,
|
|
J2.get_template(self.template).render(self.tmplvars),
|
|
)
|
|
|
|
|
|
class Category(dict):
|
|
def __init__(self, name=""):
|
|
self.name = name
|
|
|
|
def __setitem__(self, key, value):
|
|
if key in self:
|
|
raise LookupError(
|
|
f"key '{key}' already exists, colliding posts are: {self[key].fpath} vs {value.fpath}"
|
|
)
|
|
dict.__setitem__(self, key, value)
|
|
|
|
@property
|
|
def title(self):
|
|
if len(self.name):
|
|
return f"{self.name} - {settings.site.name}"
|
|
else:
|
|
return settings.site.headline
|
|
|
|
@property
|
|
def url(self):
|
|
if len(self.name):
|
|
url = f"{settings.site.url}/{settings.paths.category}/{self.name}/"
|
|
else:
|
|
url = f"{settings.site.url}/"
|
|
return url
|
|
|
|
@property
|
|
def feedurl(self):
|
|
return f"{self.url}{settings.paths.feed}/"
|
|
|
|
@property
|
|
def sortedkeys(self):
|
|
return list(sorted(self.keys(), reverse=True))
|
|
|
|
@property
|
|
def ctmplvars(self):
|
|
return {
|
|
"name": self.name,
|
|
"url": self.url,
|
|
"feed": self.feedurl,
|
|
"title": self.title,
|
|
}
|
|
|
|
@property
|
|
def renderdir(self):
|
|
b = settings.paths.build
|
|
if len(self.name):
|
|
b = os.path.join(b, settings.paths.category, self.name)
|
|
return b
|
|
|
|
@property
|
|
def newest_year(self):
|
|
return arrow.get(max(self.keys())).format("YYYY")
|
|
|
|
@cached_property
|
|
def years(self):
|
|
years = {}
|
|
for key in list(sorted(self.keys(), reverse=True)):
|
|
year = arrow.get(int(key)).format("YYYY")
|
|
if year in years:
|
|
continue
|
|
if year == self.newest_year:
|
|
url = f"{self.url}{settings.filenames.html}"
|
|
else:
|
|
url = f"{self.url}{year}/{settings.filenames.html}"
|
|
years.update({year: url})
|
|
return years
|
|
|
|
async def render_feeds(self):
|
|
await self.AtomFeed(self).render()
|
|
await self.RSSFeed(self).render()
|
|
await self.JSONFeed(self).render()
|
|
|
|
async def render(self):
|
|
await self.render_feeds()
|
|
await self.Gopher(self).render()
|
|
if self.name in settings.flat:
|
|
await self.Flat(self).render()
|
|
else:
|
|
for year in sorted(self.years.keys()):
|
|
await self.Year(self, year).render()
|
|
|
|
class JSONFeed(object):
|
|
def __init__(self, parent):
|
|
self.parent = parent
|
|
|
|
@property
|
|
def mtime(self):
|
|
return max(
|
|
list(sorted(self.parent.keys(), reverse=True))[
|
|
0 : settings.pagination
|
|
]
|
|
)
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(
|
|
self.parent.renderdir,
|
|
settings.paths.feed,
|
|
settings.filenames.json,
|
|
)
|
|
|
|
@property
|
|
def exists(self):
|
|
if settings.args.get("force"):
|
|
return False
|
|
if not os.path.exists(self.renderfile):
|
|
return False
|
|
if mtime(self.renderfile) >= self.mtime:
|
|
return True
|
|
return False
|
|
|
|
async def render(self):
|
|
if self.exists:
|
|
logger.debug(
|
|
"category %s is up to date", self.parent.name
|
|
)
|
|
return
|
|
|
|
logger.info(
|
|
"rendering JSON feed for category %s", self.parent.name
|
|
)
|
|
|
|
js = {
|
|
"version": "https://jsonfeed.org/version/1",
|
|
"title": self.parent.title,
|
|
"home_page_url": settings.site.url,
|
|
"feed_url": f"{self.parent.url}{settings.filenames.json}",
|
|
"author": {
|
|
"name": settings.author.name,
|
|
"url": settings.author.url,
|
|
"avatar": settings.author.image,
|
|
},
|
|
"items": [],
|
|
}
|
|
|
|
for key in list(sorted(self.parent.keys(), reverse=True))[
|
|
0 : settings.pagination
|
|
]:
|
|
post = self.parent[key]
|
|
pjs = {
|
|
"id": post.url,
|
|
"content_text": post.txt_content,
|
|
"content_html": post.html_content,
|
|
"url": post.url,
|
|
"date_published": str(post.published),
|
|
}
|
|
if len(post.summary):
|
|
pjs.update({"summary": post.txt_summary})
|
|
if post.is_photo:
|
|
pjs.update(
|
|
{
|
|
"attachment": {
|
|
"url": post.photo.href,
|
|
"mime_type": post.photo.mime_type,
|
|
"size_in_bytes": f"{post.photo.mime_size}",
|
|
}
|
|
}
|
|
)
|
|
js["items"].append(pjs)
|
|
writepath(
|
|
self.renderfile,
|
|
json.dumps(js, indent=4, ensure_ascii=False),
|
|
)
|
|
|
|
class XMLFeed(object):
|
|
def __init__(self, parent):
|
|
self.parent = parent
|
|
|
|
def init_entry(self, post):
|
|
fe = FeedEntry()
|
|
fe.id(post.url)
|
|
fe.title(post.title)
|
|
fe.published(post.published.datetime)
|
|
fe.updated(arrow.get(post.dt).datetime)
|
|
lname = post.licence.upper()
|
|
lyear = post.published.format("YYYY")
|
|
fe.rights(f"{lname} {settings.author.name} {lyear}")
|
|
fe.author(
|
|
{
|
|
"name": settings.author.name,
|
|
"email": settings.author.email,
|
|
}
|
|
)
|
|
categories = []
|
|
for tag in set(post.tags):
|
|
categories.append(
|
|
{
|
|
"term": tag
|
|
# "scheme": "https://schema.org/keywords"
|
|
}
|
|
)
|
|
fe.category(categories)
|
|
if post.is_photo:
|
|
fe.enclosure(
|
|
post.photo.href,
|
|
"%d" % post.photo.mime_size,
|
|
post.photo.mime_type,
|
|
)
|
|
return fe
|
|
|
|
@property
|
|
def rkeys(self):
|
|
rkeys = list(sorted(self.parent.keys(), reverse=True))
|
|
rkeys = rkeys[0 : settings.pagination]
|
|
rkeys = list(sorted(rkeys, reverse=False))
|
|
return rkeys
|
|
|
|
@property
|
|
def mtime(self):
|
|
return max(
|
|
list(sorted(self.parent.keys(), reverse=True))[
|
|
0 : settings.pagination
|
|
]
|
|
)
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(
|
|
self.parent.renderdir, settings.paths.feed, "index.xml"
|
|
)
|
|
|
|
@property
|
|
def exists(self):
|
|
if settings.args.get("force"):
|
|
return False
|
|
if not os.path.exists(self.renderfile):
|
|
return False
|
|
if mtime(self.renderfile) >= self.mtime:
|
|
return True
|
|
return False
|
|
|
|
def uptodate(self):
|
|
logger.debug(
|
|
"category %s %s is up to date",
|
|
self.parent.name,
|
|
self.__class__.__name__,
|
|
)
|
|
|
|
def notuptodate(self):
|
|
logger.info(
|
|
"rendering %s feed for category %s",
|
|
self.__class__.__name__,
|
|
self.parent.name,
|
|
)
|
|
|
|
def init_fg(self):
|
|
fg = FeedGenerator()
|
|
fg.id(self.parent.feedurl)
|
|
fg.title(self.parent.title)
|
|
fg.logo(settings.site.image)
|
|
fg.updated(arrow.get(self.mtime).to("utc").datetime)
|
|
fg.description(settings.site.headline)
|
|
fg.link(href=self.parent.feedurl)
|
|
fg.author(
|
|
{
|
|
"name": settings.author.name,
|
|
"email": settings.author.email,
|
|
}
|
|
)
|
|
return fg
|
|
|
|
class AtomFeed(XMLFeed):
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(
|
|
self.parent.renderdir,
|
|
settings.paths.feed,
|
|
settings.filenames.atom,
|
|
)
|
|
|
|
async def render(self):
|
|
if self.exists:
|
|
self.uptodate()
|
|
return
|
|
|
|
self.notuptodate()
|
|
fg = self.init_fg()
|
|
|
|
for key in self.rkeys:
|
|
post = self.parent[key]
|
|
fe = self.init_entry(post)
|
|
fe.link(
|
|
href=post.url, rel="alternate", type="text/html"
|
|
)
|
|
# fe.content(src=post.url, type="text/html")
|
|
fe.content(post.html_content, type="html")
|
|
if len(post.summary):
|
|
fe.summary(post.summary)
|
|
fg.add_entry(fe)
|
|
|
|
writepath(self.renderfile, fg.atom_str(pretty=True))
|
|
|
|
class RSSFeed(XMLFeed):
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(
|
|
self.parent.renderdir,
|
|
settings.paths.feed,
|
|
settings.filenames.rss,
|
|
)
|
|
|
|
async def render(self):
|
|
if self.exists:
|
|
self.uptodate()
|
|
return
|
|
|
|
self.notuptodate()
|
|
fg = self.init_fg()
|
|
|
|
for key in self.rkeys:
|
|
post = self.parent[key]
|
|
fe = self.init_entry(post)
|
|
fe.link(href=post.url)
|
|
fe.content(post.html_content, type="CDATA")
|
|
fg.add_entry(fe)
|
|
|
|
output = fg.rss_str(pretty=True)
|
|
writepath(self.renderfile, output)
|
|
|
|
class Year(object):
|
|
def __init__(self, parent, year):
|
|
self.parent = parent
|
|
self.year = str(year)
|
|
|
|
@cached_property
|
|
def keys(self):
|
|
year = arrow.get(self.year, "YYYY").to("utc")
|
|
keys = []
|
|
for key in list(sorted(self.parent.keys(), reverse=True)):
|
|
ts = arrow.get(int(key))
|
|
if ts <= year.ceil("year") and ts >= year.floor("year"):
|
|
keys.append(int(key))
|
|
return keys
|
|
|
|
@property
|
|
def posttmplvars(self):
|
|
return [self.parent[key].jsonld for key in self.keys]
|
|
|
|
@property
|
|
def mtime(self):
|
|
return max(self.keys)
|
|
|
|
@property
|
|
def renderfile(self):
|
|
if self.year == self.parent.newest_year:
|
|
return os.path.join(
|
|
self.parent.renderdir, settings.filenames.html
|
|
)
|
|
else:
|
|
return os.path.join(
|
|
self.parent.renderdir,
|
|
self.year,
|
|
settings.filenames.html,
|
|
)
|
|
|
|
@property
|
|
def baseurl(self):
|
|
if self.year == self.parent.newest_year:
|
|
return self.parent.url
|
|
else:
|
|
return f"{self.parent.url}{self.year}/"
|
|
|
|
@property
|
|
def template(self):
|
|
return "%s.j2.html" % (self.__class__.__name__)
|
|
|
|
@property
|
|
def exists(self):
|
|
if settings.args.get("force"):
|
|
return False
|
|
if not os.path.exists(self.renderfile):
|
|
return False
|
|
if mtime(self.renderfile) >= self.mtime:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def tmplvars(self):
|
|
return {
|
|
"baseurl": self.baseurl,
|
|
"site": settings.site,
|
|
"menu": settings.menu,
|
|
"meta": settings.meta,
|
|
"fnames": settings.filenames,
|
|
"category": {
|
|
"name": self.parent.name,
|
|
"url": self.parent.url,
|
|
"feed": self.parent.feedurl,
|
|
"title": self.parent.title,
|
|
"paginated": True,
|
|
"years": self.parent.years,
|
|
"year": self.year,
|
|
},
|
|
"posts": self.posttmplvars,
|
|
}
|
|
|
|
async def render(self):
|
|
if self.exists:
|
|
logger.debug(
|
|
"category %s is up to date", self.parent.name
|
|
)
|
|
return
|
|
logger.info(
|
|
"rendering year %s for category %s",
|
|
self.year,
|
|
self.parent.name,
|
|
)
|
|
r = J2.get_template(self.template).render(self.tmplvars)
|
|
writepath(self.renderfile, r)
|
|
del r
|
|
|
|
class Flat(object):
|
|
def __init__(self, parent):
|
|
self.parent = parent
|
|
|
|
@property
|
|
def posttmplvars(self):
|
|
return [
|
|
self.parent[key].jsonld
|
|
for key in list(
|
|
sorted(self.parent.keys(), reverse=True)
|
|
)
|
|
]
|
|
|
|
@property
|
|
def mtime(self):
|
|
return max(self.parent.keys())
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(
|
|
self.parent.renderdir, settings.filenames.html
|
|
)
|
|
|
|
@property
|
|
def template(self):
|
|
return "%s.j2.html" % (self.__class__.__name__)
|
|
|
|
@property
|
|
def exists(self):
|
|
if settings.args.get("force"):
|
|
return False
|
|
if not os.path.exists(self.renderfile):
|
|
return False
|
|
if mtime(self.renderfile) >= self.mtime:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def tmplvars(self):
|
|
return {
|
|
"baseurl": self.parent.url,
|
|
"site": settings.site,
|
|
"menu": settings.menu,
|
|
"meta": settings.meta,
|
|
"fnames": settings.filenames,
|
|
"category": {
|
|
"name": self.parent.name,
|
|
"url": self.parent.url,
|
|
"feed": self.parent.feedurl,
|
|
"title": self.parent.title,
|
|
},
|
|
"posts": self.posttmplvars,
|
|
}
|
|
|
|
async def render(self):
|
|
if self.exists:
|
|
logger.debug(
|
|
"category %s is up to date", self.parent.name
|
|
)
|
|
return
|
|
logger.info("rendering category %s", self.parent.name)
|
|
r = J2.get_template(self.template).render(self.tmplvars)
|
|
writepath(self.renderfile, r)
|
|
del r
|
|
|
|
class Gopher(object):
|
|
def __init__(self, parent):
|
|
self.parent = parent
|
|
|
|
@property
|
|
def mtime(self):
|
|
return max(self.parent.keys())
|
|
|
|
@property
|
|
def exists(self):
|
|
if settings.args.get("force"):
|
|
return False
|
|
if not os.path.exists(self.renderfile):
|
|
return False
|
|
if mtime(self.renderfile) >= self.mtime:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(
|
|
self.parent.renderdir, settings.filenames.gopher
|
|
)
|
|
|
|
async def render(self):
|
|
if self.exists:
|
|
logger.debug(
|
|
"category %s is up to date", self.parent.name
|
|
)
|
|
return
|
|
|
|
lines = [
|
|
"%s - %s" % (self.parent.name, settings.site.name),
|
|
"",
|
|
"",
|
|
]
|
|
for post in [
|
|
self.parent[key]
|
|
for key in list(
|
|
sorted(self.parent.keys(), reverse=True)
|
|
)
|
|
]:
|
|
line = "0%s\t/%s/%s\t%s\t70" % (
|
|
post.title,
|
|
post.name,
|
|
settings.filenames.txt,
|
|
settings.site.name,
|
|
)
|
|
lines.append(line)
|
|
if len(post.txt_summary):
|
|
lines.extend(post.txt_summary.split("\n"))
|
|
for img in post.images.values():
|
|
line = "I%s\t/%s/%s\t%s\t70" % (
|
|
img.title,
|
|
post.name,
|
|
img.name,
|
|
settings.site.name,
|
|
)
|
|
lines.append(line)
|
|
lines.append("")
|
|
writepath(self.renderfile, "\r\n".join(lines))
|
|
|
|
|
|
class Sitemap(dict):
|
|
@property
|
|
def mtime(self):
|
|
r = 0
|
|
if os.path.exists(self.renderfile):
|
|
r = mtime(self.renderfile)
|
|
return r
|
|
|
|
def append(self, post):
|
|
self[post.url] = post.mtime
|
|
|
|
@property
|
|
def renderfile(self):
|
|
return os.path.join(
|
|
settings.paths.get("build"), settings.filenames.sitemap
|
|
)
|
|
|
|
async def render(self):
|
|
if len(self) > 0:
|
|
if self.mtime >= sorted(self.values())[-1]:
|
|
return
|
|
with open(self.renderfile, "wt") as f:
|
|
f.write("\n".join(sorted(self.keys())))
|
|
|
|
|
|
class Webmention(object):
|
|
""" outgoing webmention class """
|
|
|
|
def __init__(self, source, target, dpath, mtime=0):
|
|
self.source = source
|
|
self.target = target
|
|
self.dpath = dpath
|
|
if not mtime:
|
|
mtime = arrow.utcnow().timestamp
|
|
self.mtime = mtime
|
|
|
|
@property
|
|
def fpath(self):
|
|
return os.path.join(
|
|
self.dpath, "%s.ping" % (url2slug(self.target))
|
|
)
|
|
|
|
@property
|
|
def exists(self):
|
|
if not os.path.isfile(self.fpath):
|
|
return False
|
|
elif mtime(self.fpath) > self.mtime:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def save(self, content):
|
|
writepath(self.fpath, content)
|
|
|
|
async def send(self):
|
|
if self.exists:
|
|
return
|
|
elif settings.args.get("noping"):
|
|
self.save("noping entry at %s" % arrow.now())
|
|
return
|
|
|
|
telegraph_url = "https://telegraph.p3k.io/webmention"
|
|
telegraph_params = {
|
|
"token": "%s" % (keys.telegraph.get("token")),
|
|
"source": "%s" % (self.source),
|
|
"target": "%s" % (self.target),
|
|
}
|
|
r = requests.post(telegraph_url, data=telegraph_params)
|
|
logger.info(
|
|
"sent webmention to telegraph from %s to %s",
|
|
self.source,
|
|
self.target,
|
|
)
|
|
if r.status_code not in [200, 201, 202]:
|
|
logger.error("sending failed: %s %s", r.status_code, r.text)
|
|
else:
|
|
self.save(r.text)
|
|
|
|
async def backfill_syndication(self):
|
|
""" this is very specific to webmention.io and brid.gy publish """
|
|
|
|
if not self.exists:
|
|
return
|
|
if "fed.brid.gy" in self.target:
|
|
return
|
|
if "brid.gy" not in self.target:
|
|
return
|
|
if not self.exists:
|
|
return
|
|
|
|
with open(self.fpath, "rt") as f:
|
|
txt = f.read()
|
|
|
|
try:
|
|
data = json.loads(txt)
|
|
except Exception as e:
|
|
""" if it's not a JSON, it's a manually placed file, ignore it """
|
|
logger.debug("not a JSON webmention at %s", self.fpath)
|
|
return
|
|
|
|
# unprocessed webmention
|
|
if "http_body" not in data and "location" in data:
|
|
logger.debug(
|
|
"fetching webmention.io respose from %s",
|
|
data["location"],
|
|
)
|
|
wio = requests.get(data["location"])
|
|
if wio.status_code != requests.codes.ok:
|
|
logger.debug("fetching %s failed", data["location"])
|
|
return
|
|
|
|
try:
|
|
wio_json = json.loads(wio.text)
|
|
logger.debug("got response %s", wio_json)
|
|
if "http_body" in wio_json and isinstance(
|
|
wio_json["http_body"], str
|
|
):
|
|
wio_json.update(
|
|
{
|
|
"http_body": json.loads(
|
|
"".join(wio_json["http_body"])
|
|
)
|
|
}
|
|
)
|
|
if "original" in wio_json["http_body"].keys():
|
|
wio_json.update(
|
|
{
|
|
"http_body": wio_json["http_body"][
|
|
"original"
|
|
]
|
|
}
|
|
)
|
|
data = {**data, **wio_json}
|
|
except Exception as e:
|
|
logger.error(
|
|
"failed JSON from webmention.io %s because: %s",
|
|
wio.text,
|
|
e,
|
|
)
|
|
return
|
|
|
|
logger.debug(
|
|
"saving updated webmention.io data %s to %s",
|
|
data,
|
|
self.fpath,
|
|
)
|
|
with open(self.fpath, "wt") as update:
|
|
update.write(json.dumps(data, sort_keys=True, indent=4))
|
|
|
|
if "http_body" in data.keys():
|
|
# healthy and processed webmention
|
|
if (
|
|
isinstance(data["http_body"], dict)
|
|
and "url" in data["http_body"].keys()
|
|
):
|
|
url = data["http_body"]["url"]
|
|
sp = os.path.join(self.dpath, "%s.copy" % url2slug(url))
|
|
if os.path.exists(sp):
|
|
logger.debug(
|
|
"syndication already exists for %s", url
|
|
)
|
|
return
|
|
with open(sp, "wt") as f:
|
|
logger.info(
|
|
"writing syndication copy %s to %s", url, sp
|
|
)
|
|
f.write(url)
|
|
return
|
|
|
|
|
|
class WebmentionIO(object):
|
|
def __init__(self):
|
|
self.params = {
|
|
"token": "%s" % (keys.webmentionio.get("token")),
|
|
"since": "%s" % str(self.since),
|
|
"domain": "%s" % (keys.webmentionio.get("domain")),
|
|
}
|
|
self.url = "https://webmention.io/api/mentions"
|
|
|
|
@property
|
|
def since(self):
|
|
newest = 0
|
|
content = settings.paths.get("content")
|
|
for e in glob.glob(os.path.join(content, "*", "*", "*.md")):
|
|
if os.path.basename(e) == settings.filenames.md:
|
|
continue
|
|
# filenames are like [received epoch]-[slugified source url].md
|
|
try:
|
|
mtime = int(os.path.basename(e).split("-")[0])
|
|
except Exception as exc:
|
|
logger.error(
|
|
"int conversation failed: %s, file was: %s", exc, e
|
|
)
|
|
continue
|
|
if mtime > newest:
|
|
newest = mtime
|
|
return arrow.get(newest + 1)
|
|
|
|
def makecomment(self, webmention):
|
|
if "published_ts" in webmention.get("data"):
|
|
maybe = webmention.get("data").get("published")
|
|
if not maybe or maybe == "None":
|
|
dt = arrow.get(webmention.get("verified_date"))
|
|
else:
|
|
dt = arrow.get(webmention.get("data").get("published"))
|
|
|
|
slug = os.path.split(
|
|
urlparse(webmention.get("target")).path.lstrip("/")
|
|
)[0]
|
|
|
|
author = webmention.get("data", {}).get("author", None)
|
|
if not author:
|
|
logger.error(
|
|
"missing author info on webmention; skipping; webmention data is: %s",
|
|
webmention.get("source"),
|
|
)
|
|
return
|
|
|
|
# ignore selfpings
|
|
if slug == settings.site.get("name"):
|
|
return
|
|
elif not len(slug):
|
|
logger.error(
|
|
"couldn't find post for incoming webmention: %s",
|
|
webmention.get("source"),
|
|
)
|
|
|
|
fdir = glob.glob(
|
|
os.path.join(settings.paths.get("content"), "*", slug)
|
|
)
|
|
|
|
if not len(fdir):
|
|
logger.error(
|
|
"couldn't find post for incoming webmention: %s",
|
|
webmention.get("source"),
|
|
)
|
|
return
|
|
elif len(fdir) > 1:
|
|
logger.error(
|
|
"multiple posts found for incoming webmention: %s",
|
|
webmention.get("source"),
|
|
)
|
|
return
|
|
|
|
fdir = fdir.pop()
|
|
fpath = os.path.join(
|
|
fdir,
|
|
"%d-%s.md"
|
|
% (dt.timestamp, url2slug(webmention.get("source"))),
|
|
)
|
|
|
|
author = webmention.get("data", {}).get("author", None)
|
|
if not author:
|
|
logger.error(
|
|
"missing author info on webmention: %s", webmention
|
|
)
|
|
return
|
|
meta = {
|
|
"author": {
|
|
"name": author.get("name", ""),
|
|
"url": author.get("url", ""),
|
|
"photo": author.get("photo", ""),
|
|
},
|
|
"date": str(dt),
|
|
"source": webmention.get("source"),
|
|
"target": webmention.get("target"),
|
|
"type": webmention.get("activity").get(
|
|
"type", "webmention"
|
|
),
|
|
}
|
|
|
|
try:
|
|
txt = webmention.get("data").get("content", "").strip()
|
|
except Exception as e:
|
|
txt = ""
|
|
pass
|
|
|
|
r = "---\n%s\n---\n\n%s\n" % (utfyamldump(meta), txt)
|
|
writepath(fpath, r, mtime=dt.timestamp)
|
|
|
|
def run(self):
|
|
webmentions = requests.get(self.url, params=self.params)
|
|
logger.info("queried webmention.io with: %s", webmentions.url)
|
|
if webmentions.status_code != requests.codes.ok:
|
|
return
|
|
try:
|
|
mentions = webmentions.json()
|
|
for webmention in mentions.get("links"):
|
|
self.makecomment(webmention)
|
|
except ValueError as e:
|
|
logger.error("failed to query webmention.io: %s", e)
|
|
pass
|
|
|
|
|
|
def make():
|
|
start = int(round(time.time() * 1000))
|
|
last = 0
|
|
|
|
# get incoming webmentions
|
|
if not (
|
|
settings.args.get("offline") or settings.args.get("noservices")
|
|
):
|
|
incoming = WebmentionIO()
|
|
incoming.run()
|
|
# TODO get queued micropub posts?
|
|
|
|
queue = AQ()
|
|
outbox = []
|
|
to_archive = []
|
|
|
|
content = settings.paths.get("content")
|
|
rules = IndexPHP()
|
|
|
|
sitemap = Sitemap()
|
|
search = Search()
|
|
categories = {}
|
|
frontposts = Category()
|
|
home = Home(settings.paths.get("home"))
|
|
worldmap = WorldMap()
|
|
postcount = 0
|
|
commentcount = 0
|
|
micropub = Micropub()
|
|
|
|
for e in glob.glob(os.path.join(content, "*", "*.url")):
|
|
post = Redirect(e)
|
|
rules.add_redirect(post.source, post.target)
|
|
|
|
for e in sorted(
|
|
glob.glob(
|
|
os.path.join(content, "*", "*", settings.filenames.md)
|
|
)
|
|
):
|
|
post = Singular(e)
|
|
if not post.is_future:
|
|
postcount = postcount + 1
|
|
commentcount = commentcount + len(post.comments)
|
|
worldmap.add(post)
|
|
for i in post.to_ping:
|
|
outbox.append(i)
|
|
if not (
|
|
settings.args.get("offline")
|
|
or settings.args.get("noservices")
|
|
):
|
|
queue.put(i.backfill_syndication())
|
|
micropub.add_tags(post.tags)
|
|
|
|
for i in post.images.values():
|
|
queue.put(i.downsize())
|
|
|
|
# if not post.is_future and not post.has_archive:
|
|
# to_archive.append(post.url)
|
|
|
|
# render and arbitrary file copy tasks for this very post
|
|
queue.put(post.render())
|
|
queue.put(post.copy_files())
|
|
|
|
# skip draft posts from anything further
|
|
if post.is_future:
|
|
logger.info("%s is for the future", post.name)
|
|
continue
|
|
|
|
# add post to search database
|
|
search.append(post)
|
|
|
|
# start populating sitemap
|
|
sitemap.append(post)
|
|
|
|
# populate redirects, if any
|
|
rules.add_redirect(post.shortslug, post.url)
|
|
|
|
# any category starting with '_' are special: they shouldn't have a
|
|
# category archive page
|
|
if post.is_page:
|
|
continue
|
|
|
|
# populate the category with the post
|
|
if post.category not in categories:
|
|
categories[post.category] = Category(post.category)
|
|
categories[post.category][post.published.timestamp] = post
|
|
|
|
# add to front, if allowed
|
|
if post.is_front:
|
|
frontposts[post.published.timestamp] = post
|
|
|
|
# commit to search database - this saves quite a few disk writes
|
|
search.__exit__()
|
|
|
|
# render search and sitemap
|
|
queue.put(search.render())
|
|
queue.put(sitemap.render())
|
|
|
|
# make gone and redirect arrays for PHP
|
|
for e in glob.glob(os.path.join(content, "*", "*.del")):
|
|
post = Gone(e)
|
|
rules.add_gone(post.source)
|
|
for e in glob.glob(os.path.join(content, "*", "*.url")):
|
|
post = Redirect(e)
|
|
rules.add_redirect(post.source, post.target)
|
|
queue.put(post.render())
|
|
# render 404 fallback PHP
|
|
queue.put(rules.render())
|
|
|
|
# render categories
|
|
for category in categories.values():
|
|
home.add(category, category.get(category.sortedkeys[0]))
|
|
queue.put(category.render())
|
|
|
|
# RSS and ATOM feeds
|
|
queue.put(frontposts.render_feeds())
|
|
|
|
# home
|
|
queue.put(home.render())
|
|
|
|
# worldmap for photos
|
|
queue.put(worldmap.render())
|
|
|
|
# fediverse stats
|
|
fediversemockery = FediverseStats(postcount, commentcount)
|
|
queue.put(fediversemockery.render())
|
|
|
|
# micropub handler PHP
|
|
queue.put(micropub.render())
|
|
|
|
# render all the things!
|
|
queue.run()
|
|
|
|
# copy static files
|
|
for e in glob.glob(os.path.join(content, "*.*")):
|
|
if e.endswith(".md"):
|
|
continue
|
|
t = os.path.join(
|
|
settings.paths.get("build"), os.path.basename(e)
|
|
)
|
|
maybe_copy(e, t)
|
|
|
|
end = int(round(time.time() * 1000))
|
|
logger.info("process took %d ms" % (end - start))
|
|
|
|
if not settings.args.get("offline"):
|
|
# upload site
|
|
try:
|
|
logger.info("starting syncing")
|
|
os.system(
|
|
f"rsync -avuhH --exclude='.git' --delete-after {settings.paths.build}/ {settings.syncserver}:{settings.paths.remotewww}"
|
|
)
|
|
logger.info("syncing finished")
|
|
except Exception as e:
|
|
logger.error("syncing failed: %s", e)
|
|
|
|
if not settings.args.get("noservices"):
|
|
logger.info("sending webmentions")
|
|
for wm in outbox:
|
|
queue.put(wm.send())
|
|
queue.run()
|
|
logger.info("sending webmentions finished")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
make()
|