nasg/nasg.py

2722 lines
78 KiB
Python
Raw Permalink Normal View History

2017-05-23 11:14:47 +01:00
#!/usr/bin/env python3
2017-12-17 17:37:32 +00:00
__author__ = "Peter Molnar"
__copyright__ = "Copyright 2017-2019, Peter Molnar"
__license__ = "apache-2.0"
2017-12-17 17:37:32 +00:00
__maintainer__ = "Peter Molnar"
2018-04-30 20:44:04 +01:00
__email__ = "mail@petermolnar.net"
2017-05-23 11:14:47 +01:00
2018-07-20 16:45:42 +01:00
import glob
2017-05-23 11:14:47 +01:00
import os
2018-07-20 16:45:42 +01:00
import time
2017-05-23 11:14:47 +01:00
import re
import asyncio
2018-07-22 11:33:59 +01:00
import sqlite3
import json
2019-08-14 11:28:01 +01:00
2018-07-20 16:45:42 +01:00
from shutil import copy2 as cp
from urllib.parse import urlparse
2019-08-13 17:04:50 +01:00
from collections import namedtuple
import logging
import arrow
2017-05-23 11:14:47 +01:00
import langdetect
import wand.image
import filetype
2018-07-20 16:45:42 +01:00
import jinja2
import yaml
2019-01-15 21:28:58 +00:00
import frontmatter
from feedgen.feed import FeedGenerator
from feedgen.entry import FeedEntry
from slugify import slugify
import requests
2019-03-22 15:49:24 +00:00
from pandoc import PandocMD2HTML, PandocMD2TXT, PandocHTML2TXT
from meta import Exif
2018-07-20 16:45:42 +01:00
import settings
import keys
2019-08-14 11:28:01 +01:00
import wayback
2018-07-20 16:45:42 +01:00
2019-06-25 22:48:04 +01:00
logger = logging.getLogger("NASG")
MarkdownImage = namedtuple(
"MarkdownImage", ["match", "alt", "fname", "title", "css"]
)
2018-07-20 16:45:42 +01:00
RE_MDIMG = re.compile(
2019-06-25 22:48:04 +01:00
r"(?P<match>!\[(?P<alt>[^\]]+)?\]\((?P<fname>[^\s\]]+)"
r"(?:\s[\'\"](?P<title>[^\"\']+)[\'\"])?\)(?:{(?P<css>[^\}]+)\})?)",
re.IGNORECASE,
2018-07-20 16:45:42 +01:00
)
RE_CODE = re.compile(r"^(?:[~`]{3,4}).+$", re.MULTILINE)
2019-06-25 22:48:04 +01:00
RE_PRECODE = re.compile(r'<pre class="([^"]+)"><code>')
2018-07-20 16:45:42 +01:00
RE_MYURL = re.compile(
r'(^(%s[^"]+)$|"(%s[^"]+)")'
% (settings.site.url, settings.site.url)
)
2018-07-20 16:45:42 +01:00
def mtime(path):
""" return seconds level mtime or 0 (chomp microsecs) """
if os.path.exists(path):
return int(os.path.getmtime(path))
return 0
def utfyamldump(data):
""" dump YAML with actual UTF-8 chars """
return yaml.dump(
data, default_flow_style=False, indent=4, allow_unicode=True
)
2018-11-19 16:16:52 +00:00
def url2slug(url, limit=200):
""" convert URL to max 200 char ASCII string """
url = re.sub(r"^https?://(?:www)?", "", url)
url = slugify(url, only_ascii=True, lower=True)
return url[:limit]
def rfc3339todt(rfc3339):
""" nice dates for humans """
2019-06-25 22:48:04 +01:00
t = arrow.get(rfc3339).format("YYYY-MM-DD HH:mm ZZZ")
return str(t)
2019-03-22 15:49:24 +00:00
def extractlicense(url):
""" extract license name """
n, e = os.path.splitext(os.path.basename(url))
return n.upper()
def relurl(text, baseurl=None):
if not baseurl:
baseurl = settings.site.url
for match, standalone, href in RE_MYURL.findall(text):
needsquotes = False
if len(href):
needsquotes = True
url = href
else:
url = standalone
r = os.path.relpath(url, baseurl)
2019-06-25 22:48:04 +01:00
if url.endswith("/") and not r.endswith("/"):
r = "%s/%s" % (r, settings.filenames.html)
if needsquotes:
r = '"%s"' % r
logger.debug("RELURL: %s => %s (base: %s)", match, r, baseurl)
text = text.replace(match, r)
return text
def writepath(fpath, content, mtime=0):
""" f.write with extras """
d = os.path.dirname(fpath)
if not os.path.isdir(d):
2019-06-25 22:48:04 +01:00
logger.debug("creating directory tree %s", d)
os.makedirs(d)
if isinstance(content, str):
2019-06-25 22:48:04 +01:00
mode = "wt"
else:
2019-06-25 22:48:04 +01:00
mode = "wb"
with open(fpath, mode) as f:
2019-06-25 22:48:04 +01:00
logger.info("writing file %s", fpath)
f.write(content)
if mtime > 0:
os.utime(fpath, (mtime, mtime))
def maybe_copy(source, target):
""" copy only if target mtime is smaller, than source mtime """
if os.path.exists(target) and mtime(source) <= mtime(target):
return
logger.info("copying '%s' to '%s'", source, target)
cp(source, target)
def extractdomain(url):
url = urlparse(url)
2019-08-14 11:28:01 +01:00
return url.hostname
J2 = jinja2.Environment(
loader=jinja2.FileSystemLoader(
searchpath=settings.paths.get("tmpl")
),
lstrip_blocks=True,
trim_blocks=True,
)
J2.filters["relurl"] = relurl
J2.filters["url2slug"] = url2slug
J2.filters["printdate"] = rfc3339todt
J2.filters["extractlicense"] = extractlicense
J2.filters["extractdomain"] = extractdomain
class cached_property(object):
""" extermely simple cached_property decorator:
whenever something is called as @cached_property, on first run, the
result is calculated, then the class method is overwritten to be
a property, contaning the result from the method
"""
def __init__(self, method, name=None):
self.method = method
self.name = name or method.__name__
def __get__(self, inst, cls):
if inst is None:
return self
result = self.method(inst)
setattr(inst, self.name, result)
return result
class AQ:
""" Async queue which starts execution right on population """
def __init__(self):
self.loop = asyncio.get_event_loop()
self.queue = asyncio.Queue(loop=self.loop)
def put(self, task):
self.queue.put(asyncio.ensure_future(task))
async def consume(self):
while not self.queue.empty():
item = await self.queue.get()
self.queue.task_done()
# asyncio.gather() ?
def run(self):
consumer = asyncio.ensure_future(self.consume())
self.loop.run_until_complete(consumer)
class Gone(object):
"""
Gone object for delete entries
"""
def __init__(self, fpath):
self.fpath = fpath
@property
def mtime(self):
return mtime(self.fpath)
@property
def exists(self):
if (
os.path.exists(self.renderfile)
and mtime(self.renderfile) >= self.mtime
):
return True
return False
2019-05-22 20:54:10 +01:00
@property
def renderdir(self):
return os.path.join(settings.paths.get("build"), self.source)
@property
def renderfile(self):
return os.path.join(self.renderdir, settings.filenames.html)
2019-05-22 20:54:10 +01:00
@property
def source(self):
source, fext = os.path.splitext(os.path.basename(self.fpath))
return source
2019-05-22 20:54:10 +01:00
@property
def template(self):
return "%s.j2.html" % (self.__class__.__name__)
@property
def tmplvars(self):
return {"source": self.source}
async def render(self):
if self.exists:
return
logger.info(
"rendering %s to %s", self.__class__, self.renderfile
)
writepath(
self.renderfile, J2.get_template(self.template).render()
)
class FediverseStats(object):
def __init__(self, postcount=0, commentcount=0):
self.postcount = postcount
self.commentcount = commentcount
self.nodeinfo_json = "nodeinfo.json"
async def render(self):
writepath(
os.path.join(
settings.paths.build, ".well-known", "nodeinfo"
),
json.dumps(self.metanodeinfo, indent=4, ensure_ascii=False),
)
writepath(
os.path.join(
settings.paths.build, ".well-known", self.nodeinfo_json
),
json.dumps(self.nodeinfo, indent=4, ensure_ascii=False),
)
@property
def metanodeinfo(self):
return {
"links": [
{
"href": f"{settings.site.url}/.well-known/{self.nodeinfo_json}",
"rel": "http://nodeinfo.diaspora.software/ns/schema/2.0",
}
]
}
@property
def nodeinfo(self):
return {
"metadata": {
"nodeName": settings.site.name,
"software": {
"homepage": "https://github.com/petermolnar/nasg",
"github": "https://github.com/petermolnar/nasg",
"follow": f"{settings.site.url}",
},
},
"openRegistrations": False,
"protocols": ["webmention", "activitypub"],
"services": {
"inbound": [],
"outbound": ["github", "flickr"],
},
"software": {
"name": "nasg",
"version": "6.6"
},
"usage": {
"localPosts": self.postcount,
"localComments": self.commentcount,
"users": {
"total": 1,
"activeHalfyear": 1,
"activeMonth": 1,
},
},
"version": "2.0",
}
class Redirect(Gone):
"""
Redirect object for entries that moved
"""
@cached_property
def target(self):
target = ""
with open(self.fpath, "rt") as f:
target = f.read().strip()
return target
@property
def tmplvars(self):
return {"source": self.source, "target": self.target}
2019-06-25 22:48:04 +01:00
2018-07-20 16:45:42 +01:00
class MarkdownDoc(object):
""" Base class for anything that is stored as .md """
2019-06-25 22:48:04 +01:00
def __init__(self, fpath):
self.fpath = fpath
2019-01-15 21:28:58 +00:00
@property
def mtime(self):
return mtime(self.fpath)
2019-01-15 21:28:58 +00:00
@property
def dt(self):
""" returns an arrow object; tries to get the published date of the
markdown doc. The pubdate can be in the future, which is why it's
done the way it is """
maybe = arrow.get(self.mtime)
2019-06-25 22:48:04 +01:00
for key in ["published", "date"]:
2019-01-15 21:28:58 +00:00
t = self.meta.get(key, None)
2019-06-25 22:48:04 +01:00
if t and "null" != t:
2019-01-15 21:28:58 +00:00
try:
t = arrow.get(t)
if t.timestamp > maybe.timestamp:
maybe = t
2019-01-15 21:28:58 +00:00
except Exception as e:
logger.error(
"failed to parse date: %s for key %s in %s",
t,
key,
self.fpath,
2019-01-15 21:28:58 +00:00
)
continue
2019-01-15 21:28:58 +00:00
return maybe
@cached_property
2018-07-20 16:45:42 +01:00
def _parsed(self):
2019-06-25 22:48:04 +01:00
with open(self.fpath, mode="rt") as f:
logger.debug("parsing YAML+MD file %s", self.fpath)
2019-01-15 21:28:58 +00:00
meta, txt = frontmatter.parse(f.read())
2019-06-25 22:48:04 +01:00
return (meta, txt)
2019-01-15 21:28:58 +00:00
2019-03-22 15:49:24 +00:00
@cached_property
2018-07-20 16:45:42 +01:00
def meta(self):
return self._parsed[0]
2017-06-12 15:40:30 +01:00
2019-03-22 15:49:24 +00:00
@cached_property
2018-07-20 16:45:42 +01:00
def content(self):
maybe = self._parsed[1]
if not maybe or not len(maybe):
maybe = str("")
return maybe
2017-06-12 15:40:30 +01:00
@cached_property
2018-07-20 16:45:42 +01:00
def html_content(self):
if not len(self.content):
return self.content
2019-03-22 15:49:24 +00:00
c = self.content
2019-06-25 22:48:04 +01:00
if hasattr(self, "images") and len(self.images):
2018-07-20 16:45:42 +01:00
for match, img in self.images.items():
if self.is_photo:
c = c.replace(match, "")
else:
c = c.replace(match, str(img))
2019-03-22 15:49:24 +00:00
c = str(PandocMD2HTML(c))
c = RE_PRECODE.sub(
'<pre><code lang="\g<1>" class="language-\g<1>">', c
)
2019-03-22 15:49:24 +00:00
return c
@cached_property
def txt_content(self):
if not len(self.content):
return ""
2018-07-20 16:45:42 +01:00
else:
return PandocMD2TXT(self.content)
class Comment(MarkdownDoc):
@property
def dt(self):
maybe = self.meta.get("date")
if not maybe or maybe == "null":
maybe = int(os.path.basename(self.fpath).split("-")[0])
dt = arrow.get(maybe)
if self.mtime != dt.timestamp:
os.utime(self.fpath, (dt.timestamp, dt.timestamp))
return dt
@property
2018-07-20 16:45:42 +01:00
def source(self):
2019-06-25 22:48:04 +01:00
return self.meta.get("source")
@property
2018-07-20 16:45:42 +01:00
def author(self):
r = {
"@context": "http://schema.org",
"@type": "Person",
2019-06-25 22:48:04 +01:00
"name": urlparse(self.source).hostname,
"url": self.source,
2018-07-20 16:45:42 +01:00
}
2019-06-25 22:48:04 +01:00
author = self.meta.get("author")
2018-07-20 16:45:42 +01:00
if not author:
return r
2019-06-25 22:48:04 +01:00
if "name" in author:
r.update({"name": self.meta.get("author").get("name")})
elif "url" in author:
r.update(
{
"name": urlparse(
self.meta.get("author").get("url")
).hostname
}
)
2018-07-20 16:45:42 +01:00
return r
2017-10-28 19:08:40 +01:00
@property
2018-07-20 16:45:42 +01:00
def type(self):
2019-06-25 22:48:04 +01:00
return self.meta.get("type", "webmention")
@cached_property
def jsonld(self):
r = {
"@context": "http://schema.org",
"@type": "Comment",
"author": self.author,
"url": self.source,
2019-06-25 22:48:04 +01:00
"discussionUrl": self.meta.get("target"),
"datePublished": str(self.dt),
2019-06-25 22:48:04 +01:00
"disambiguatingDescription": self.type,
}
return r
2019-08-14 11:28:01 +01:00
class WebImage(object):
def __init__(self, fpath, mdimg, parent):
logger.debug("loading image: %s", fpath)
self.mdimg = mdimg
2018-07-20 16:45:42 +01:00
self.fpath = fpath
self.parent = parent
self.mtime = mtime(self.fpath)
self.name = os.path.basename(self.fpath)
self.fname, self.fext = os.path.splitext(self.name)
self.resized_images = [
(k, self.Resized(self, k))
for k in settings.photo.get("sizes").keys()
if k < max(self.width, self.height)
]
if not len(self.resized_images):
self.resized_images.append(
(
max(self.width, self.height),
self.Resized(self, max(self.width, self.height)),
)
)
@property
def is_mainimg(self):
if self.fname == self.parent.name:
return True
return False
@property
def jsonld(self):
r = {
"@context": "http://schema.org",
"@type": "ImageObject",
"url": self.href,
"image": self.href,
"thumbnail": settings.nameddict(
{
"@context": "http://schema.org",
"@type": "ImageObject",
"url": self.src,
"width": self.displayed.width,
"height": self.displayed.height,
}
),
"name": self.name,
"encodingFormat": self.mime_type,
"contentSize": self.mime_size,
"width": self.linked.width,
"height": self.linked.height,
"dateCreated": self.exif.get("CreateDate"),
"exifData": [],
"caption": self.caption,
"headline": self.title,
"representativeOfPage": False,
# "text": str(self)
}
for k, v in self.exif.items():
r["exifData"].append(
{"@type": "PropertyValue", "name": k, "value": v}
)
if self.is_photo:
licence = settings.licence["_default"]
r.update(
{
"creator": settings.author,
"copyrightHolder": settings.author,
"license": f"https://spdx.org/licenses/{licence}.html",
}
)
if self.is_mainimg:
r.update({"representativeOfPage": True})
if (
self.exif["GPSLatitude"] != 0
and self.exif["GPSLongitude"] != 0
):
r.update(
{
"locationCreated": settings.nameddict(
{
"@context": "http://schema.org",
"@type": "Place",
"geo": settings.nameddict(
{
"@context": "http://schema.org",
"@type": "GeoCoordinates",
"latitude": round(
self.exif["GPSLatitude"], 4
),
"longitude": round(
self.exif["GPSLongitude"], 4
),
}
),
}
)
}
)
return settings.nameddict(r)
def __str__(self):
if len(self.mdimg.css):
return self.mdimg.match
tmpl = J2.get_template("%s.j2.html" % (self.__class__.__name__))
return tmpl.render(self.jsonld)
@cached_property
def meta(self):
return Exif(self.fpath)
2017-05-23 11:14:47 +01:00
@property
def caption(self):
if len(self.mdimg.alt):
return self.mdimg.alt
else:
return self.meta.get("Description", "")
2019-06-25 22:48:04 +01:00
@property
def title(self):
if len(self.mdimg.title):
return self.mdimg.title
else:
return self.meta.get("Headline", self.fname)
@property
def tags(self):
return list(set(self.meta.get("Subject", [])))
@property
def published(self):
return arrow.get(
self.meta.get("ReleaseDate", self.meta.get("ModifyDate"))
)
@property
def width(self):
return int(self.meta.get("ImageWidth"))
2019-01-15 21:28:58 +00:00
@property
def height(self):
return int(self.meta.get("ImageHeight"))
2019-01-15 21:28:58 +00:00
@property
def mime_type(self):
return str(self.meta.get("MIMEType", "image/jpeg"))
@property
def mime_size(self):
try:
size = os.path.getsize(self.linked.fpath)
except Exception as e:
logger.error(
"Failed to get mime size of %s", self.linked.fpath
)
size = self.meta.get("FileSize", 0)
return size
@property
def displayed(self):
ret = self.resized_images[0][1]
for size, r in self.resized_images:
if size == settings.photo.get("default"):
ret = r
return ret
2017-06-12 15:17:29 +01:00
@property
def linked(self):
m = 0
ret = self.resized_images[0][1]
for size, r in self.resized_images:
if size > m:
m = size
ret = r
return ret
2017-05-23 11:14:47 +01:00
@property
def src(self):
return self.displayed.url
2018-06-17 18:30:50 +01:00
@property
def href(self):
return self.linked.url
2018-06-17 18:30:50 +01:00
@property
2018-07-20 16:45:42 +01:00
def is_photo(self):
r = settings.photo.get("re_author", None)
if not r:
return False
cpr = self.meta.get("Copyright", "")
art = self.meta.get("Artist", "")
# both Artist and Copyright missing from EXIF
if not cpr and not art:
return False
# we have regex, Artist and Copyright, try matching them
if r.search(cpr) or r.search(art):
2018-07-20 16:45:42 +01:00
return True
return False
2017-05-23 11:14:47 +01:00
@property
def exif(self):
exif = {
"Model": "",
"FNumber": "",
"ExposureTime": "",
"FocalLength": "",
"ISO": "",
"LensID": "",
"CreateDate": str(arrow.get(self.mtime)),
"GPSLatitude": 0,
"GPSLongitude": 0,
}
if not self.is_photo:
return exif
mapping = {
"Model": ["Model"],
"FNumber": ["FNumber", "Aperture"],
"ExposureTime": ["ExposureTime"],
2019-08-14 11:28:01 +01:00
"FocalLength": ["FocalLength"],
"ISO": ["ISO"],
"LensID": ["LensID", "LensSpec", "Lens"],
"CreateDate": ["CreateDate", "DateTimeOriginal"],
"GPSLatitude": ["GPSLatitude"],
"GPSLongitude": ["GPSLongitude"],
}
2019-03-22 15:49:24 +00:00
for ekey, candidates in mapping.items():
for candidate in candidates:
maybe = self.meta.get(candidate, None)
if not maybe:
continue
else:
exif[ekey] = maybe
break
return settings.nameddict(exif)
2019-03-22 15:49:24 +00:00
def _maybe_watermark(self, img):
if not self.is_photo:
return img
2017-10-27 15:56:05 +01:00
wmarkfile = settings.paths.get("watermark")
if not os.path.exists(wmarkfile):
return img
2017-10-27 15:56:05 +01:00
with wand.image.Image(filename=wmarkfile) as wmark:
w = self.height * 0.2
h = wmark.height * (w / wmark.width)
if self.width > self.height:
x = self.width - w - (self.width * 0.01)
y = self.height - h - (self.height * 0.01)
else:
x = self.width - h - (self.width * 0.01)
y = self.height - w - (self.height * 0.01)
2017-10-27 15:56:05 +01:00
w = round(w)
h = round(h)
x = round(x)
y = round(y)
2017-06-12 15:40:30 +01:00
wmark.resize(w, h)
if self.width <= self.height:
wmark.rotate(-90)
img.composite(image=wmark, left=x, top=y)
return img
async def downsize(self):
need = False
for size, resized in self.resized_images:
if not resized.exists or settings.args.get("regenerate"):
need = True
break
if not need:
return
2017-06-12 15:40:30 +01:00
with wand.image.Image(filename=self.fpath) as img:
img.auto_orient()
img = self._maybe_watermark(img)
for size, resized in self.resized_images:
if not resized.exists or settings.args.get(
"regenerate"
):
logger.info(
"resizing image: %s to size %d",
os.path.basename(self.fpath),
size,
)
await resized.make(img)
2017-06-12 15:40:30 +01:00
class Resized:
def __init__(self, parent, size, crop=False):
self.parent = parent
self.size = size
self.crop = crop
2017-05-23 11:14:47 +01:00
2019-08-14 11:28:01 +01:00
# @property
# def data(self):
# with open(self.fpath, "rb") as f:
# encoded = base64.b64encode(f.read())
# return "data:%s;base64,%s" % (
# self.parent.mime_type,
# encoded.decode("utf-8"),
# )
@property
def suffix(self):
return settings.photo.get("sizes").get(self.size, "")
2017-06-02 11:19:55 +01:00
@property
def fname(self):
return "%s%s%s" % (
self.parent.fname,
self.suffix,
self.parent.fext,
2019-06-25 22:48:04 +01:00
)
@property
def fpath(self):
return os.path.join(
self.parent.parent.renderdir, self.fname
)
@property
def url(self):
return "%s/%s/%s" % (
settings.site.get("url"),
self.parent.parent.name,
"%s%s%s"
% (self.parent.fname, self.suffix, self.parent.fext),
2019-06-25 22:48:04 +01:00
)
@property
def relpath(self):
return "%s/%s" % (
self.parent.parent.renderdir.replace(
settings.paths.get("build"), ""
),
self.fname,
2019-06-25 22:48:04 +01:00
)
2017-06-12 15:40:30 +01:00
@property
def exists(self):
if os.path.isfile(self.fpath):
if mtime(self.fpath) >= self.parent.mtime:
return True
return False
@property
def width(self):
return self.dimensions[0]
@property
def height(self):
return self.dimensions[1]
@property
def dimensions(self):
width = self.parent.width
height = self.parent.height
size = self.size
ratio = max(width, height) / min(width, height)
horizontal = True if (width / height) >= 1 else False
# panorama: reverse "horizontal" because the limit should be on
# the shorter side, not the longer, and make it a bit smaller, than
# the actual limit
# 2.39 is the wide angle cinematic view: anything wider, than that
# is panorama land
if ratio > 2.4 and not self.crop:
size = int(size * 0.6)
horizontal = not horizontal
if (horizontal and not self.crop) or (
not horizontal and self.crop
):
w = size
h = int(float(size / width) * height)
else:
h = size
w = int(float(size / height) * width)
return (w, h)
2017-06-12 15:40:30 +01:00
async def make(self, original):
if not os.path.isdir(os.path.dirname(self.fpath)):
os.makedirs(os.path.dirname(self.fpath))
with original.clone() as thumb:
thumb.resize(self.width, self.height)
2017-06-12 15:40:30 +01:00
if self.crop:
thumb.liquid_rescale(self.size, self.size, 1, 1)
if (
self.parent.meta.get("FileType", "jpeg").lower()
== "jpeg"
):
thumb.compression_quality = 88
thumb.unsharp_mask(
radius=1, sigma=0.5, amount=0.7, threshold=0.5
)
thumb.format = "pjpeg"
# this is to make sure pjpeg happens
with open(self.fpath, "wb") as f:
logger.info("writing %s", self.fpath)
thumb.save(file=f)
2018-04-30 20:44:04 +01:00
class Singular(MarkdownDoc):
"""
A Singular object: a complete representation of a post, including
all it's comments, files, images, etc
"""
def __init__(self, fpath):
self.fpath = fpath
self.dirpath = os.path.dirname(fpath)
self.name = os.path.basename(self.dirpath)
self.category = os.path.basename(os.path.dirname(self.dirpath))
@cached_property
def files(self):
"""
An array of files present at the same directory level as
the Singular object, excluding hidden (starting with .) and markdown
(ending with .md) files
"""
return [
k
for k in glob.glob(os.path.join(self.dirpath, "*.*"))
if not k.startswith(".")
]
@cached_property
def comments(self):
"""
An dict of Comment objects keyed with their path, populated from the
same directory level as the Singular objects
"""
comments = {}
canditates = glob.glob(os.path.join(self.dirpath, "*.md"))
for candidate in canditates:
if os.path.basename(candidate) == settings.filenames.md:
continue
if candidate.startswith("."):
continue
comment = Comment(candidate)
comments[comment.dt.timestamp] = comment
return comments
@cached_property
def images(self):
"""
A dict of WebImage objects, populated by:
- images that are present in the Markdown content
- and have an actual image file at the same directory level as
the Singular object
"""
images = {}
2019-08-14 11:28:01 +01:00
for match, alt, fname, title, css in RE_MDIMG.findall(
self.content
):
mdimg = MarkdownImage(match, alt, fname, title, css)
imgpath = os.path.join(self.dirpath, fname)
if imgpath in self.files:
kind = filetype.guess(imgpath)
if kind and "image" in kind.mime.lower():
images.update(
{match: WebImage(imgpath, mdimg, self)}
)
else:
logger.error(
"Missing image: %s, referenced in %s",
imgpath,
self.fpath,
)
continue
return images
@property
def summary(self):
return str(self.meta.get("summary", ""))
@cached_property
def html_summary(self):
if not len(self.summary):
return ""
else:
return PandocMD2HTML(self.summary)
@cached_property
def txt_summary(self):
if not len(self.summary):
return ""
else:
return PandocMD2TXT(self.summary)
@property
def published(self):
# ok, so here's a hack: because I have no idea when my older photos
# were actually published, any photo from before 2014 will have
# the EXIF createdate as publish date
pub = arrow.get(self.meta.get("published"))
if self.is_photo:
maybe = arrow.get(self.photo.exif.get("CreateDate"))
if maybe.year < settings.photo.earlyyears:
pub = maybe
return pub
@property
def updated(self):
if "updated" in self.meta:
return arrow.get(self.meta.get("updated"))
else:
return self.dt
@property
def sameas(self):
r = {}
for k in glob.glob(os.path.join(self.dirpath, "*.copy")):
with open(k, "rt") as f:
r.update({f.read(): True})
return list(r.keys())
@property
def is_page(self):
""" all the categories starting with _ are pages """
if self.category.startswith("_"):
return True
return False
@property
def is_front(self):
if self.category in settings.notinfeed:
return False
return True
@property
def is_photo(self):
"""
This is true if there is a file, with the same name as the entry's
directory - so, it's slug -, and that that image believes it's a a
photo.
"""
if len(self.images) != 1:
return False
photo = next(iter(self.images.values()))
maybe = self.fpath.replace(
settings.filenames.md, "%s.jpg" % (self.name)
)
if photo.fpath == maybe:
return True
return False
@property
def is_reply(self):
return self.meta.get("in-reply-to", False)
2019-06-25 22:48:04 +01:00
@property
def is_future(self):
if self.published.timestamp > arrow.utcnow().timestamp:
return True
return False
2019-01-15 21:28:58 +00:00
@property
def photo(self):
if not self.is_photo:
return None
return next(iter(self.images.values()))
2019-01-15 21:28:58 +00:00
@property
def title(self):
if self.is_reply:
return "RE: %s" % self.is_reply
return self.meta.get(
"title", self.published.format(settings.displaydate)
)
2019-01-15 21:28:58 +00:00
@property
def tags(self):
return self.meta.get("tags", [])
def baseN(
self, num, b=36, numerals="0123456789abcdefghijklmnopqrstuvwxyz"
):
"""
Creates short, lowercase slug for a number (an epoch) passed
"""
num = int(num)
return ((num == 0) and numerals[0]) or (
self.baseN(num // b, b, numerals).lstrip(numerals[0])
+ numerals[num % b]
)
2019-01-15 21:28:58 +00:00
@property
def shortslug(self):
return self.baseN(self.published.timestamp)
@property
def to_syndicate(self):
urls = self.meta.get("syndicate", [])
if not self.is_page:
urls.append("https://fed.brid.gy/")
urls.append("https://brid.gy/publish/mastodon")
if self.is_photo:
urls.append("https://brid.gy/publish/flickr")
return urls
@property
def to_ping(self):
webmentions = []
for url in self.to_syndicate:
w = Webmention(
self.url,
url,
os.path.dirname(self.fpath),
2019-08-14 11:28:01 +01:00
self.dt.timestamp,
)
webmentions.append(w)
if self.is_reply:
w = Webmention(
self.url,
self.is_reply,
os.path.dirname(self.fpath),
2019-08-14 11:28:01 +01:00
self.dt.timestamp,
)
webmentions.append(w)
return webmentions
2019-01-15 21:28:58 +00:00
@property
def licence(self):
k = "_default"
if self.category in settings.licence:
k = self.category
return settings.licence[k]
2019-01-15 21:28:58 +00:00
@property
def lang(self):
lang = "en"
try:
lang = langdetect.detect(
"\n".join([self.meta.get("title", ""), self.content])
2019-06-25 22:48:04 +01:00
)
except BaseException:
pass
return lang
@property
def url(self):
return "%s/%s/" % (settings.site.get("url"), self.name)
@property
def has_code(self):
if RE_CODE.search(self.content):
return True
else:
return False
@cached_property
def review(self):
if "review" not in self.meta:
return False
review = self.meta.get("review")
rated, outof = review.get("rating").split("/")
r = {
"@context": "https://schema.org/",
"@type": "Review",
"reviewRating": {
"@type": "Rating",
"@context": "http://schema.org",
"ratingValue": rated,
"bestRating": outof,
"worstRating": 1,
},
"name": review.get("title"),
"text": review.get("summary"),
"url": review.get("url"),
"author": settings.author,
}
return r
@cached_property
def event(self):
if "event" not in self.meta:
return False
event = self.meta.get("event", {})
r = {
"@context": "http://schema.org",
"@type": "Event",
"endDate": str(arrow.get(event.get("end"))),
"startDate": str(arrow.get(event.get("start"))),
"location": {
"@context": "http://schema.org",
"@type": "Place",
"address": event.get("location"),
"name": event.get("location"),
},
"name": self.title,
}
return r
@cached_property
def jsonld(self):
r = {
"@context": "http://schema.org",
"@type": "Article",
"@id": self.url,
"inLanguage": self.lang,
"headline": self.title,
"url": self.url,
"genre": self.category,
"mainEntityOfPage": f"{self.url}#article",
"dateModified": str(self.dt),
"datePublished": str(self.published),
"copyrightYear": str(self.published.format("YYYY")),
"license": f"https://spdx.org/licenses/{self.licence}.html",
"image": settings.site.image,
"author": settings.author,
"sameAs": self.sameas,
"publisher": settings.site.publisher,
"name": self.name,
"text": self.html_content,
"description": self.html_summary,
"potentialAction": [],
"comment": [],
"commentCount": len(self.comments.keys()),
"keywords": self.tags,
}
if self.is_photo:
r.update({"@type": "Photograph"})
elif self.has_code:
r.update({"@type": "TechArticle"})
elif self.is_page:
r.update({"@type": "WebPage"})
if len(self.images):
r["image"] = []
for img in list(self.images.values()):
imgld = img.jsonld
imgld["text"] = str(img)
r["image"].append(imgld)
if self.is_reply:
2019-06-25 22:48:04 +01:00
r.update(
{
"mentions": {
"@context": "http://schema.org",
"@type": "Thing",
"url": self.is_reply,
}
2019-06-25 22:48:04 +01:00
}
)
2018-06-08 10:14:39 +01:00
if self.review:
r.update({"review": self.review})
2017-06-02 11:19:55 +01:00
if self.event:
r.update({"subjectOf": self.event})
2017-06-12 15:40:30 +01:00
for url in list(set(self.to_syndicate)):
r["potentialAction"].append(
{
"@context": "http://schema.org",
"@type": "InteractAction",
"target": url,
}
)
2017-06-12 15:40:30 +01:00
for mtime in sorted(self.comments.keys()):
r["comment"].append(self.comments[mtime].jsonld)
2017-06-02 11:19:55 +01:00
return settings.nameddict(r)
2018-04-30 20:44:04 +01:00
@property
def template(self):
return f"{self.__class__.__name__}.j2.html"
2018-04-30 20:44:04 +01:00
@property
def txttemplate(self):
return f"{self.__class__.__name__}.j2.txt"
2017-06-02 11:19:55 +01:00
2018-07-20 16:45:42 +01:00
@property
def renderdir(self):
return os.path.join(settings.paths.get("build"), self.name)
2017-06-12 15:40:30 +01:00
@property
def renderfile(self):
return os.path.join(self.renderdir, settings.filenames.html)
2017-05-23 11:14:47 +01:00
@property
def txtfile(self):
return os.path.join(self.renderdir, settings.filenames.txt)
@property
def exists(self):
if settings.args.get("force"):
logger.debug("rendering required: force mode on")
return False
maybe = self.dt.timestamp
if len(self.files):
for f in self.files:
maybe = max(maybe, mtime(f))
for f in [self.renderfile, self.txtfile]:
if not os.path.exists(f):
logger.debug(f"rendering required: no {f} yet")
return False
elif maybe > mtime(f):
logger.debug(f"rendering required: self.dt > {f} mtime")
return False
logger.debug("rendering not required")
return True
2017-06-12 15:40:30 +01:00
2017-06-12 15:17:29 +01:00
@property
def corpus(self):
2019-08-14 11:28:01 +01:00
return "\n".join(
[self.title, self.name, self.summary, self.content]
)
2018-07-20 16:45:42 +01:00
async def copy_files(self):
exclude = [
".md",
".jpg",
".png",
".gif",
".ping",
".url",
".del",
".copy",
".cache",
]
files = glob.glob(
os.path.join(os.path.dirname(self.fpath), "*.*")
)
for f in files:
fname, fext = os.path.splitext(f)
if fext.lower() in exclude:
continue
2018-07-20 16:45:42 +01:00
t = os.path.join(
settings.paths.get("build"),
self.name,
os.path.basename(f),
2019-06-25 22:48:04 +01:00
)
if os.path.exists(t) and mtime(f) <= mtime(t):
continue
logger.info("copying '%s' to '%s'", f, t)
cp(f, t)
@property
def has_archive(self):
return len(
glob.glob(os.path.join(self.dirpath, f"*archiveorg*.copy"))
)
2019-08-14 11:28:01 +01:00
async def get_from_archiveorg(self):
if self.has_archive:
2019-08-14 11:28:01 +01:00
return
if self.is_future:
return
if (
self.published.timestamp + 86400
) > arrow.utcnow().timestamp:
return
logger.info("archive.org .copy is missing for %s", self.name)
if len(self.category) and not (
settings.args.get("noservices")
or settings.args.get("offline")
):
wb = wayback.FindWaybackURL(self.name, self.category)
2019-08-14 11:28:01 +01:00
wb.run()
if len(wb.oldest):
archiveurl = url2slug(wb.oldest)
t = os.path.join(self.dirpath, f"{archiveurl}.copy")
writepath(t, wb.oldest)
del wb
async def render(self):
await self.get_from_archiveorg()
2019-08-14 11:28:01 +01:00
if self.exists:
return True
logger.info("rendering %s", self.name)
v = {
"baseurl": self.url,
"post": self.jsonld,
"site": settings.site,
"menu": settings.menu,
"meta": settings.meta,
2019-08-14 11:28:01 +01:00
"fnames": settings.filenames,
}
writepath(
self.renderfile, J2.get_template(self.template).render(v)
)
del v
2018-07-20 16:45:42 +01:00
g = {
"post": self.jsonld,
"summary": self.txt_summary,
"content": self.txt_content,
}
writepath(
2019-08-14 11:28:01 +01:00
self.txtfile, J2.get_template(self.txttemplate).render(g)
)
del g
2018-07-20 16:45:42 +01:00
class Home(Singular):
def __init__(self, fpath):
super().__init__(fpath)
self.cdata = {}
self.pdata = {}
2018-07-20 16:45:42 +01:00
def add(self, category, post):
if not len(category.name):
return
if category.name not in self.cdata:
self.cdata[category.name] = category
if category.name not in self.pdata:
self.pdata[category.name] = post
else:
current = arrow.get(self.pdata[category.name].datePublished)
if current > post.published:
return
else:
self.pdata[category.name] = post
return
@property
def posts(self):
flattened = []
order = {}
for cname, post in self.pdata.items():
order[post.published.timestamp] = cname
for mtime in sorted(order.keys(), reverse=True):
category = self.cdata[order[mtime]].ctmplvars
post = self.pdata[order[mtime]].jsonld
flattened.append((category, post))
return flattened
2017-06-12 15:40:30 +01:00
@property
def renderdir(self):
return settings.paths.get("build")
2017-06-12 15:40:30 +01:00
@property
def renderfile(self):
return os.path.join(
settings.paths.get("build"), settings.filenames.html
)
@property
def dt(self):
ts = 0
for cat, post in self.posts:
ts = max(ts, arrow.get(post["dateModified"]).timestamp)
return arrow.get(ts)
2017-10-27 15:56:05 +01:00
async def render_gopher(self):
2019-08-14 11:28:01 +01:00
lines = ["%s's gopherhole" % (settings.site.name), "", ""]
2017-10-27 15:56:05 +01:00
for category, post in self.posts:
line = "1%s\t/%s/%s\t%s\t70" % (
category["name"],
settings.paths.category,
category["name"],
settings.site.name,
)
lines.append(line)
lines.append("")
writepath(
self.renderfile.replace(
settings.filenames.html, settings.filenames.gopher
),
"\r\n".join(lines),
)
2017-10-27 15:56:05 +01:00
async def render(self):
if self.exists:
return
logger.info("rendering %s", self.name)
r = J2.get_template(self.template).render(
{
"baseurl": settings.site.get("url"),
"post": self.jsonld,
"site": settings.site,
"menu": settings.menu,
"meta": settings.meta,
"posts": self.posts,
"fnames": settings.filenames,
}
)
writepath(self.renderfile, r)
await self.render_gopher()
2017-10-27 15:56:05 +01:00
class PHPFile(object):
@property
def exists(self):
2019-06-25 22:48:04 +01:00
if settings.args.get("force"):
return False
if not os.path.exists(self.renderfile):
return False
if self.mtime > mtime(self.renderfile):
return False
return True
@property
def mtime(self):
return mtime(
os.path.join(settings.paths.get("tmpl"), self.templatefile)
)
@property
def renderfile(self):
2019-06-25 22:48:04 +01:00
raise ValueError("Not implemented")
@property
def templatefile(self):
2019-06-25 22:48:04 +01:00
raise ValueError("Not implemented")
async def render(self):
# if self.exists:
2019-06-25 22:48:04 +01:00
# return
await self._render()
class Search(PHPFile):
def __init__(self):
self.fpath = os.path.join(
settings.paths.get("build"), "search.sqlite"
)
self.db = sqlite3.connect(self.fpath)
2019-06-25 22:48:04 +01:00
self.db.execute("PRAGMA auto_vacuum = INCREMENTAL;")
self.db.execute("PRAGMA journal_mode = MEMORY;")
self.db.execute("PRAGMA temp_store = MEMORY;")
self.db.execute("PRAGMA locking_mode = NORMAL;")
self.db.execute("PRAGMA synchronous = FULL;")
self.db.execute('PRAGMA encoding = "UTF-8";')
2019-06-25 22:48:04 +01:00
self.db.execute(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS data USING fts4(
url,
mtime,
name,
title,
category,
content,
notindexed=category,
notindexed=url,
notindexed=mtime,
tokenize=porter
2019-06-25 22:48:04 +01:00
)"""
)
self.is_changed = False
def __exit__(self):
if self.is_changed:
self.db.commit()
2019-06-25 22:48:04 +01:00
self.db.execute("PRAGMA auto_vacuum;")
self.db.close()
def check(self, name):
ret = 0
2019-06-25 22:48:04 +01:00
maybe = self.db.execute(
"""
SELECT
mtime
FROM
data
WHERE
name = ?
2019-06-25 22:48:04 +01:00
""",
(name,),
).fetchone()
if maybe:
ret = int(maybe[0])
return ret
def append(self, post):
mtime = int(post.published.timestamp)
check = self.check(post.name)
2019-06-25 22:48:04 +01:00
if check and check < mtime:
self.db.execute(
"""
DELETE
FROM
data
WHERE
2019-06-25 22:48:04 +01:00
name=?""",
(post.name,),
)
check = False
if not check:
2019-06-25 22:48:04 +01:00
self.db.execute(
"""
INSERT INTO
data
(url, mtime, name, title, category, content)
VALUES
(?,?,?,?,?,?);
2019-06-25 22:48:04 +01:00
""",
(
post.url,
mtime,
post.name,
post.title,
post.category,
post.content,
),
2019-06-25 22:48:04 +01:00
)
self.is_changed = True
@property
def templates(self):
2019-06-25 22:48:04 +01:00
return ["Search.j2.php", "OpenSearch.j2.xml"]
async def _render(self):
for template in self.templates:
2019-06-25 22:48:04 +01:00
r = J2.get_template(template).render(
{
"post": {},
"site": settings.site,
"menu": settings.menu,
"meta": settings.meta,
}
)
target = os.path.join(
settings.paths.get("build"),
template.replace(".j2", "").lower(),
)
writepath(target, r)
class IndexPHP(PHPFile):
def __init__(self):
self.gone = {}
self.redirect = {}
def add_gone(self, uri):
self.gone[uri] = True
def add_redirect(self, source, target):
if target in self.gone:
self.add_gone(source)
else:
2019-06-25 22:48:04 +01:00
if "://" not in target:
target = "%s/%s" % (settings.site.get("url"), target)
self.redirect[source] = target
@property
def renderfile(self):
2019-06-25 22:48:04 +01:00
return os.path.join(settings.paths.get("build"), "index.php")
@property
def templatefile(self):
2019-06-25 22:48:04 +01:00
return "404.j2.php"
async def _render(self):
2019-06-25 22:48:04 +01:00
r = J2.get_template(self.templatefile).render(
{
"post": {},
"site": settings.site,
"menu": settings.menu,
"gones": self.gone,
"redirects": self.redirect,
"rewrites": settings.rewrites,
"gone_re": settings.gones,
2019-06-25 22:48:04 +01:00
}
)
writepath(self.renderfile, r)
2017-10-27 15:56:05 +01:00
class Micropub(PHPFile):
@property
def renderfile(self):
return os.path.join(settings.paths.get("build"), "micropub.php")
@property
def templatefile(self):
return "%s.j2.php" % (self.__class__.__name__)
async def _render(self):
r = J2.get_template(self.templatefile).render(
{
"wallabag": keys.wallabag,
"site": settings.site,
}
)
writepath(self.renderfile, r)
class WorldMap(object):
def __init__(self):
self.data = {}
self.mtime = 0
def add(self, post):
if not post.is_photo:
return
if not post.photo.jsonld.locationCreated:
return
lat = post.photo.jsonld.locationCreated.geo.latitude
lon = post.photo.jsonld.locationCreated.geo.longitude
k = (lat, lon)
content = f'<p><a href="{post.url}"><img src="{post.photo.src}" style="width: 150px; height: auto" /><br />{post.title}</a></p>'
# d = {"latitude": nlat, "longitude": nlon, "popup": content}
if k in self.data:
self.data[k].append(content)
else:
self.data[k] = [content]
self.mtime = max(post.dt.timestamp, self.mtime)
@property
def exists(self):
if settings.args.get("force"):
return False
if not os.path.exists(self.renderfile):
return False
if mtime(self.renderfile) >= self.mtime:
return True
return False
@property
def renderfile(self):
return os.path.join(
settings.paths.get("build"), settings.filenames.worldmap
)
@property
def template(self):
return "%s.j2.html" % (self.__class__.__name__)
@property
def tmplvars(self):
return {
"token": keys.mapbox,
"site": settings.site,
"menu": settings.menu,
"meta": settings.meta,
"site": settings.site,
"geo": self.data,
}
async def render(self):
if self.exists:
return
logger.info(
"rendering %s to %s", self.__class__, self.renderfile
)
writepath(
self.renderfile,
J2.get_template(self.template).render(self.tmplvars),
)
2018-07-20 16:45:42 +01:00
class Category(dict):
2019-06-25 22:48:04 +01:00
def __init__(self, name=""):
2018-07-20 16:45:42 +01:00
self.name = name
2017-10-27 15:56:05 +01:00
2018-07-20 16:45:42 +01:00
def __setitem__(self, key, value):
if key in self:
raise LookupError(
f"key '{key}' already exists, colliding posts are: {self[key].fpath} vs {value.fpath}"
)
2018-07-20 16:45:42 +01:00
dict.__setitem__(self, key, value)
2018-07-20 16:45:42 +01:00
@property
def title(self):
if len(self.name):
return f"{self.name} - {settings.site.name}"
2018-07-20 16:45:42 +01:00
else:
return settings.site.headline
@property
2018-07-20 16:45:42 +01:00
def url(self):
if len(self.name):
url = f"{settings.site.url}/{settings.paths.category}/{self.name}/"
2018-07-20 16:45:42 +01:00
else:
url = f"{settings.site.url}/"
2018-07-20 16:45:42 +01:00
return url
2017-10-27 15:56:05 +01:00
@property
def feedurl(self):
return f"{self.url}{settings.paths.feed}/"
@property
def sortedkeys(self):
return list(sorted(self.keys(), reverse=True))
2017-10-27 15:56:05 +01:00
@property
def ctmplvars(self):
return {
"name": self.name,
"url": self.url,
"feed": self.feedurl,
"title": self.title,
}
@property
def renderdir(self):
b = settings.paths.build
2018-07-20 16:45:42 +01:00
if len(self.name):
2019-08-14 11:28:01 +01:00
b = os.path.join(b, settings.paths.category, self.name)
return b
@property
def newest_year(self):
return arrow.get(max(self.keys())).format("YYYY")
@cached_property
def years(self):
years = {}
for key in list(sorted(self.keys(), reverse=True)):
year = arrow.get(int(key)).format("YYYY")
if year in years:
continue
if year == self.newest_year:
url = f"{self.url}{settings.filenames.html}"
else:
url = f"{self.url}{year}/{settings.filenames.html}"
years.update({year: url})
return years
async def render_feeds(self):
await self.AtomFeed(self).render()
await self.RSSFeed(self).render()
await self.JSONFeed(self).render()
async def render(self):
await self.render_feeds()
await self.Gopher(self).render()
if self.name in settings.flat:
await self.Flat(self).render()
2019-04-09 21:34:03 +01:00
else:
for year in sorted(self.years.keys()):
await self.Year(self, year).render()
class JSONFeed(object):
def __init__(self, parent):
self.parent = parent
@property
def mtime(self):
2019-08-14 11:28:01 +01:00
return max(
list(sorted(self.parent.keys(), reverse=True))[
0 : settings.pagination
]
)
@property
def renderfile(self):
2019-08-14 11:28:01 +01:00
return os.path.join(
self.parent.renderdir,
settings.paths.feed,
settings.filenames.json,
)
@property
def exists(self):
if settings.args.get("force"):
return False
if not os.path.exists(self.renderfile):
return False
if mtime(self.renderfile) >= self.mtime:
return True
return False
async def render(self):
if self.exists:
2019-08-14 11:28:01 +01:00
logger.debug(
"category %s is up to date", self.parent.name
)
return
2019-08-14 11:28:01 +01:00
logger.info(
"rendering JSON feed for category %s", self.parent.name
)
2019-01-15 21:28:58 +00:00
js = {
"version": "https://jsonfeed.org/version/1",
"title": self.parent.title,
"home_page_url": settings.site.url,
"feed_url": f"{self.parent.url}{settings.filenames.json}",
"author": {
"name": settings.author.name,
"url": settings.author.url,
"avatar": settings.author.image,
},
"items": [],
}
2019-08-14 11:28:01 +01:00
for key in list(sorted(self.parent.keys(), reverse=True))[
0 : settings.pagination
]:
post = self.parent[key]
pjs = {
"id": post.url,
"content_text": post.txt_content,
"content_html": post.html_content,
"url": post.url,
"date_published": str(post.published),
}
if len(post.summary):
pjs.update({"summary": post.txt_summary})
if post.is_photo:
pjs.update(
{
"attachment": {
"url": post.photo.href,
"mime_type": post.photo.mime_type,
2019-08-14 11:28:01 +01:00
"size_in_bytes": f"{post.photo.mime_size}",
}
}
)
js["items"].append(pjs)
2019-08-14 11:28:01 +01:00
writepath(
self.renderfile,
json.dumps(js, indent=4, ensure_ascii=False),
)
class XMLFeed(object):
def __init__(self, parent):
self.parent = parent
def init_entry(self, post):
fe = FeedEntry()
fe.id(post.url)
fe.title(post.title)
fe.published(post.published.datetime)
fe.updated(arrow.get(post.dt).datetime)
lname = post.licence.upper()
lyear = post.published.format("YYYY")
fe.rights(f"{lname} {settings.author.name} {lyear}")
fe.author(
{
"name": settings.author.name,
"email": settings.author.email,
}
)
categories = []
for tag in set(post.tags):
categories.append(
{
"term": tag
# "scheme": "https://schema.org/keywords"
}
)
fe.category(categories)
if post.is_photo:
fe.enclosure(
post.photo.href,
"%d" % post.photo.mime_size,
post.photo.mime_type,
)
return fe
@property
def rkeys(self):
rkeys = list(sorted(self.parent.keys(), reverse=True))
rkeys = rkeys[0 : settings.pagination]
rkeys = list(sorted(rkeys, reverse=False))
return rkeys
@property
def mtime(self):
2019-08-14 11:28:01 +01:00
return max(
list(sorted(self.parent.keys(), reverse=True))[
0 : settings.pagination
]
)
@property
def renderfile(self):
2019-08-14 11:28:01 +01:00
return os.path.join(
self.parent.renderdir, settings.paths.feed, "index.xml"
2019-08-14 11:28:01 +01:00
)
@property
def exists(self):
if settings.args.get("force"):
return False
if not os.path.exists(self.renderfile):
return False
if mtime(self.renderfile) >= self.mtime:
return True
return False
def uptodate(self):
logger.debug(
"category %s %s is up to date",
self.parent.name,
self.__class__.__name__,
)
def notuptodate(self):
2019-08-14 11:28:01 +01:00
logger.info(
"rendering %s feed for category %s",
self.__class__.__name__,
2019-08-14 11:28:01 +01:00
self.parent.name,
)
def init_fg(self):
fg = FeedGenerator()
fg.id(self.parent.feedurl)
fg.title(self.parent.title)
fg.logo(settings.site.image)
fg.updated(arrow.get(self.mtime).to("utc").datetime)
fg.description(settings.site.headline)
fg.link(href=self.parent.feedurl)
fg.author(
{
"name": settings.author.name,
"email": settings.author.email,
}
)
return fg
class AtomFeed(XMLFeed):
@property
def renderfile(self):
return os.path.join(
self.parent.renderdir,
settings.paths.feed,
settings.filenames.atom,
)
async def render(self):
if self.exists:
self.uptodate()
return
self.notuptodate()
fg = self.init_fg()
for key in self.rkeys:
post = self.parent[key]
fe = self.init_entry(post)
fe.link(
href=post.url, rel="alternate", type="text/html"
2019-06-25 22:48:04 +01:00
)
# fe.content(src=post.url, type="text/html")
fe.content(post.html_content, type="html")
if len(post.summary):
fe.summary(post.summary)
fg.add_entry(fe)
2017-10-28 19:08:40 +01:00
writepath(self.renderfile, fg.atom_str(pretty=True))
2019-03-22 15:49:24 +00:00
class RSSFeed(XMLFeed):
@property
def renderfile(self):
return os.path.join(
self.parent.renderdir,
settings.paths.feed,
settings.filenames.rss,
)
async def render(self):
if self.exists:
self.uptodate()
return
self.notuptodate()
fg = self.init_fg()
for key in self.rkeys:
post = self.parent[key]
fe = self.init_entry(post)
fe.link(href=post.url)
fe.content(post.html_content, type="CDATA")
fg.add_entry(fe)
writepath(self.renderfile, fg.rss_str(pretty=True))
class Year(object):
2019-08-13 17:04:50 +01:00
def __init__(self, parent, year):
self.parent = parent
self.year = str(year)
@cached_property
def keys(self):
year = arrow.get(self.year, "YYYY").to("utc")
keys = []
for key in list(sorted(self.parent.keys(), reverse=True)):
ts = arrow.get(int(key))
if ts <= year.ceil("year") and ts >= year.floor("year"):
keys.append(int(key))
return keys
@property
def posttmplvars(self):
return [self.parent[key].jsonld for key in self.keys]
@property
def mtime(self):
return max(self.keys)
@property
def renderfile(self):
if self.year == self.parent.newest_year:
2019-08-14 11:28:01 +01:00
return os.path.join(
self.parent.renderdir, settings.filenames.html
)
else:
2019-08-14 11:28:01 +01:00
return os.path.join(
self.parent.renderdir,
self.year,
settings.filenames.html,
)
@property
def baseurl(self):
if self.year == self.parent.newest_year:
return self.parent.url
else:
return f"{self.parent.url}{self.year}/"
@property
def template(self):
return "%s.j2.html" % (self.__class__.__name__)
@property
def exists(self):
if settings.args.get("force"):
return False
if not os.path.exists(self.renderfile):
return False
if mtime(self.renderfile) >= self.mtime:
return True
return False
@property
def tmplvars(self):
return {
"baseurl": self.baseurl,
"site": settings.site,
"menu": settings.menu,
"meta": settings.meta,
"fnames": settings.filenames,
"category": {
"name": self.parent.name,
"url": self.parent.url,
"feed": self.parent.feedurl,
"title": self.parent.title,
"paginated": True,
"years": self.parent.years,
2019-08-14 11:28:01 +01:00
"year": self.year,
},
2019-08-14 11:28:01 +01:00
"posts": self.posttmplvars,
2019-03-22 15:49:24 +00:00
}
async def render(self):
if self.exists:
2019-08-14 11:28:01 +01:00
logger.debug(
"category %s is up to date", self.parent.name
)
return
2019-08-14 11:28:01 +01:00
logger.info(
"rendering year %s for category %s",
self.year,
self.parent.name,
)
r = J2.get_template(self.template).render(self.tmplvars)
writepath(self.renderfile, r)
2019-08-14 11:28:01 +01:00
del r
2017-05-23 11:14:47 +01:00
class Flat(object):
def __init__(self, parent):
self.parent = parent
@property
def posttmplvars(self):
return [
self.parent[key].jsonld
2019-08-14 11:28:01 +01:00
for key in list(
sorted(self.parent.keys(), reverse=True)
)
]
@property
def mtime(self):
return max(self.parent.keys())
@property
def renderfile(self):
2019-08-14 11:28:01 +01:00
return os.path.join(
self.parent.renderdir, settings.filenames.html
)
@property
def template(self):
return "%s.j2.html" % (self.__class__.__name__)
@property
def exists(self):
if settings.args.get("force"):
return False
if not os.path.exists(self.renderfile):
return False
if mtime(self.renderfile) >= self.mtime:
return True
return False
@property
def tmplvars(self):
return {
"baseurl": self.parent.url,
"site": settings.site,
"menu": settings.menu,
"meta": settings.meta,
"fnames": settings.filenames,
"category": {
"name": self.parent.name,
"url": self.parent.url,
"feed": self.parent.feedurl,
"title": self.parent.title,
},
2019-08-14 11:28:01 +01:00
"posts": self.posttmplvars,
}
async def render(self):
if self.exists:
2019-08-14 11:28:01 +01:00
logger.debug(
"category %s is up to date", self.parent.name
)
return
logger.info("rendering category %s", self.parent.name)
r = J2.get_template(self.template).render(self.tmplvars)
writepath(self.renderfile, r)
2019-08-14 11:28:01 +01:00
del r
class Gopher(object):
def __init__(self, parent):
self.parent = parent
@property
def mtime(self):
return max(self.parent.keys())
@property
def exists(self):
if settings.args.get("force"):
return False
if not os.path.exists(self.renderfile):
return False
if mtime(self.renderfile) >= self.mtime:
return True
return False
@property
def renderfile(self):
2019-08-14 11:28:01 +01:00
return os.path.join(
self.parent.renderdir, settings.filenames.gopher
)
async def render(self):
if self.exists:
2019-08-14 11:28:01 +01:00
logger.debug(
"category %s is up to date", self.parent.name
)
return
2019-08-14 11:28:01 +01:00
lines = [
"%s - %s" % (self.parent.name, settings.site.name),
"",
"",
]
for post in [
self.parent[key]
2019-08-14 11:28:01 +01:00
for key in list(
sorted(self.parent.keys(), reverse=True)
)
]:
line = "0%s\t/%s/%s\t%s\t70" % (
post.title,
post.name,
settings.filenames.txt,
settings.site.name,
)
lines.append(line)
if len(post.txt_summary):
lines.extend(post.txt_summary.split("\n"))
for img in post.images.values():
line = "I%s\t/%s/%s\t%s\t70" % (
img.title,
post.name,
img.name,
2019-06-25 22:48:04 +01:00
settings.site.name,
)
lines.append(line)
lines.append("")
writepath(self.renderfile, "\r\n".join(lines))
2019-08-14 11:28:01 +01:00
class Sitemap(dict):
@property
def mtime(self):
r = 0
if os.path.exists(self.renderfile):
r = mtime(self.renderfile)
return r
def append(self, post):
self[post.url] = post.mtime
@property
def renderfile(self):
return os.path.join(
settings.paths.get("build"), settings.filenames.sitemap
)
async def render(self):
2019-04-09 21:34:03 +01:00
if len(self) > 0:
if self.mtime >= sorted(self.values())[-1]:
return
2019-06-25 22:48:04 +01:00
with open(self.renderfile, "wt") as f:
2019-04-09 21:34:03 +01:00
f.write("\n".join(sorted(self.keys())))
class Webmention(object):
""" outgoing webmention class """
def __init__(self, source, target, dpath, mtime=0):
self.source = source
self.target = target
self.dpath = dpath
if not mtime:
mtime = arrow.utcnow().timestamp
self.mtime = mtime
@property
def fpath(self):
return os.path.join(
self.dpath, "%s.ping" % (url2slug(self.target))
)
@property
def exists(self):
if not os.path.isfile(self.fpath):
return False
elif mtime(self.fpath) > self.mtime:
return True
else:
return False
def save(self, content):
writepath(self.fpath, content)
async def send(self):
if self.exists:
return
elif settings.args.get("noping"):
self.save("noping entry at %s" % arrow.now())
return
telegraph_url = "https://telegraph.p3k.io/webmention"
telegraph_params = {
"token": "%s" % (keys.telegraph.get("token")),
"source": "%s" % (self.source),
"target": "%s" % (self.target),
}
r = requests.post(telegraph_url, data=telegraph_params)
logger.info(
"sent webmention to telegraph from %s to %s",
self.source,
self.target,
)
if r.status_code not in [200, 201, 202]:
logger.error("sending failed: %s %s", r.status_code, r.text)
else:
self.save(r.text)
async def backfill_syndication(self):
""" this is very specific to webmention.io and brid.gy publish """
if not self.exists:
return
if "fed.brid.gy" in self.target:
return
if "brid.gy" not in self.target:
return
if not self.exists:
return
with open(self.fpath, "rt") as f:
txt = f.read()
try:
data = json.loads(txt)
except Exception as e:
""" if it's not a JSON, it's a manually placed file, ignore it """
logger.debug("not a JSON webmention at %s", self.fpath)
return
# unprocessed webmention
if "http_body" not in data and "location" in data:
logger.debug(
"fetching webmention.io respose from %s",
data["location"],
)
wio = requests.get(data["location"])
if wio.status_code != requests.codes.ok:
logger.debug("fetching %s failed", data["location"])
return
try:
wio_json = json.loads(wio.text)
logger.debug("got response %s", wio_json)
if "http_body" in wio_json and isinstance(
wio_json["http_body"], str
):
wio_json.update(
{
"http_body": json.loads(
"".join(wio_json["http_body"])
)
}
)
if "original" in wio_json["http_body"].keys():
wio_json.update(
{
"http_body": wio_json["http_body"][
"original"
]
}
)
data = {**data, **wio_json}
except Exception as e:
logger.error(
"failed JSON from webmention.io %s because: %s",
wio.text,
e,
)
return
logger.debug(
"saving updated webmention.io data %s to %s",
data,
self.fpath,
)
with open(self.fpath, "wt") as update:
update.write(json.dumps(data, sort_keys=True, indent=4))
if "http_body" in data.keys():
# healthy and processed webmention
if (
isinstance(data["http_body"], dict)
and "url" in data["http_body"].keys()
):
url = data["http_body"]["url"]
2019-08-14 11:28:01 +01:00
sp = os.path.join(self.dpath, "%s.copy" % url2slug(url))
if os.path.exists(sp):
logger.debug(
"syndication already exists for %s", url
)
return
with open(sp, "wt") as f:
logger.info(
"writing syndication copy %s to %s", url, sp
)
f.write(url)
return
2019-08-14 11:28:01 +01:00
class WebmentionIO(object):
def __init__(self):
self.params = {
2019-06-25 22:48:04 +01:00
"token": "%s" % (keys.webmentionio.get("token")),
"since": "%s" % str(self.since),
"domain": "%s" % (keys.webmentionio.get("domain")),
}
2019-06-25 22:48:04 +01:00
self.url = "https://webmention.io/api/mentions"
@property
def since(self):
newest = 0
2019-06-25 22:48:04 +01:00
content = settings.paths.get("content")
for e in glob.glob(os.path.join(content, "*", "*", "*.md")):
if os.path.basename(e) == settings.filenames.md:
continue
# filenames are like [received epoch]-[slugified source url].md
try:
2019-06-25 22:48:04 +01:00
mtime = int(os.path.basename(e).split("-")[0])
except Exception as exc:
logger.error(
"int conversation failed: %s, file was: %s", exc, e
)
continue
if mtime > newest:
newest = mtime
return arrow.get(newest + 1)
def makecomment(self, webmention):
2019-06-25 22:48:04 +01:00
if "published_ts" in webmention.get("data"):
maybe = webmention.get("data").get("published")
if not maybe or maybe == "None":
dt = arrow.get(webmention.get("verified_date"))
else:
2019-06-25 22:48:04 +01:00
dt = arrow.get(webmention.get("data").get("published"))
slug = os.path.split(
urlparse(webmention.get("target")).path.lstrip("/")
)[0]
author = webmention.get("data", {}).get("author", None)
if not author:
logger.error(
"missing author info on webmention; skipping; webmention data is: %s",
webmention.get("source"),
)
return
# ignore selfpings
2019-06-25 22:48:04 +01:00
if slug == settings.site.get("name"):
return
elif not len(slug):
logger.error(
"couldn't find post for incoming webmention: %s",
webmention.get("source"),
)
fdir = glob.glob(
os.path.join(settings.paths.get("content"), "*", slug)
)
if not len(fdir):
logger.error(
"couldn't find post for incoming webmention: %s",
webmention.get("source"),
)
return
elif len(fdir) > 1:
logger.error(
"multiple posts found for incoming webmention: %s",
webmention.get("source"),
)
return
fdir = fdir.pop()
fpath = os.path.join(
fdir,
"%d-%s.md"
% (dt.timestamp, url2slug(webmention.get("source"))),
)
2019-06-25 22:48:04 +01:00
author = webmention.get("data", {}).get("author", None)
2019-01-15 21:28:58 +00:00
if not author:
logger.error(
"missing author info on webmention: %s", webmention
)
2019-01-15 21:28:58 +00:00
return
meta = {
2019-06-25 22:48:04 +01:00
"author": {
"name": author.get("name", ""),
"url": author.get("url", ""),
"photo": author.get("photo", ""),
},
2019-06-25 22:48:04 +01:00
"date": str(dt),
"source": webmention.get("source"),
"target": webmention.get("target"),
"type": webmention.get("activity").get(
"type", "webmention"
),
}
try:
2019-06-25 22:48:04 +01:00
txt = webmention.get("data").get("content", "").strip()
except Exception as e:
2019-06-25 22:48:04 +01:00
txt = ""
pass
2019-06-25 22:48:04 +01:00
r = "---\n%s\n---\n\n%s\n" % (utfyamldump(meta), txt)
writepath(fpath, r, mtime=dt.timestamp)
def run(self):
webmentions = requests.get(self.url, params=self.params)
logger.info("queried webmention.io with: %s", webmentions.url)
if webmentions.status_code != requests.codes.ok:
return
try:
mentions = webmentions.json()
2019-06-25 22:48:04 +01:00
for webmention in mentions.get("links"):
self.makecomment(webmention)
except ValueError as e:
2019-06-25 22:48:04 +01:00
logger.error("failed to query webmention.io: %s", e)
pass
def make():
start = int(round(time.time() * 1000))
last = 0
if not (
settings.args.get("offline") or settings.args.get("noservices")
):
incoming = WebmentionIO()
incoming.run()
queue = AQ()
outbox = []
to_archive = []
2019-06-25 22:48:04 +01:00
content = settings.paths.get("content")
rules = IndexPHP()
sitemap = Sitemap()
search = Search()
2018-07-20 16:45:42 +01:00
categories = {}
frontposts = Category()
2019-06-25 22:48:04 +01:00
home = Home(settings.paths.get("home"))
worldmap = WorldMap()
postcount = 0
commentcount = 0
2019-08-14 11:28:01 +01:00
for e in glob.glob(os.path.join(content, "*", "*.url")):
post = Redirect(e)
rules.add_redirect(post.source, post.target)
for e in sorted(
glob.glob(
os.path.join(content, "*", "*", settings.filenames.md)
)
):
2018-07-20 16:45:42 +01:00
post = Singular(e)
if not post.is_future:
postcount = postcount + 1
commentcount = commentcount + len(post.comments)
worldmap.add(post)
for i in post.to_ping:
outbox.append(i)
if not (
settings.args.get("offline")
or settings.args.get("noservices")
):
queue.put(i.backfill_syndication())
for i in post.images.values():
queue.put(i.downsize())
# if not post.is_future and not post.has_archive:
# to_archive.append(post.url)
# render and arbitrary file copy tasks for this very post
queue.put(post.render())
queue.put(post.copy_files())
# skip draft posts from anything further
if post.is_future:
2019-06-25 22:48:04 +01:00
logger.info("%s is for the future", post.name)
continue
# add post to search database
search.append(post)
# start populating sitemap
sitemap.append(post)
# populate redirects, if any
rules.add_redirect(post.shortslug, post.url)
# any category starting with '_' are special: they shouldn't have a
# category archive page
if post.is_page:
continue
# populate the category with the post
if post.category not in categories:
categories[post.category] = Category(post.category)
categories[post.category][post.published.timestamp] = post
# add to front, if allowed
if post.is_front:
frontposts[post.published.timestamp] = post
2018-07-20 16:45:42 +01:00
# commit to search database - this saves quite a few disk writes
2018-07-22 11:33:59 +01:00
search.__exit__()
# render search and sitemap
queue.put(search.render())
queue.put(sitemap.render())
# make gone and redirect arrays for PHP
2019-06-25 22:48:04 +01:00
for e in glob.glob(os.path.join(content, "*", "*.del")):
post = Gone(e)
queue.put(post.render())
rules.add_gone(post.source)
2019-06-25 22:48:04 +01:00
for e in glob.glob(os.path.join(content, "*", "*.url")):
post = Redirect(e)
rules.add_redirect(post.source, post.target)
queue.put(post.render())
# render 404 fallback PHP
queue.put(rules.render())
# render categories
2018-07-20 16:45:42 +01:00
for category in categories.values():
2019-01-15 21:28:58 +00:00
home.add(category, category.get(category.sortedkeys[0]))
queue.put(category.render())
queue.put(frontposts.render_feeds())
2019-01-15 21:28:58 +00:00
queue.put(home.render())
queue.put(worldmap.render())
fediversemockery = FediverseStats(postcount, commentcount)
queue.put(fediversemockery.render())
micropub = Micropub()
queue.put(micropub.render())
queue.run()
# copy static files
2019-06-25 22:48:04 +01:00
for e in glob.glob(os.path.join(content, "*.*")):
if e.endswith(".md"):
2019-01-15 21:28:58 +00:00
continue
t = os.path.join(
settings.paths.get("build"), os.path.basename(e)
)
maybe_copy(e, t)
2018-07-20 16:45:42 +01:00
end = int(round(time.time() * 1000))
2019-06-25 22:48:04 +01:00
logger.info("process took %d ms" % (end - start))
2019-06-25 22:48:04 +01:00
if not settings.args.get("offline"):
# upload site
try:
2019-06-25 22:48:04 +01:00
logger.info("starting syncing")
os.system(
2019-06-25 22:48:04 +01:00
"rsync -avuhH --delete-after %s/ %s/"
% (
settings.paths.get("build"),
"%s/%s"
% (
settings.syncserver,
settings.paths.get("remotewww"),
),
)
)
2019-06-25 22:48:04 +01:00
logger.info("syncing finished")
except Exception as e:
2019-06-25 22:48:04 +01:00
logger.error("syncing failed: %s", e)
if not settings.args.get("noservices"):
logger.info("sending webmentions")
for wm in outbox:
queue.put(wm.send())
queue.run()
logger.info("sending webmentions finished")
2019-06-25 22:48:04 +01:00
if __name__ == "__main__":
2018-07-20 16:45:42 +01:00
make()