post-black
@@ -14,9 +14,10 @@ import keys
import settings EXIFDATE = re.compile( - r'^(?P<year>[0-9]{4}):(?P<month>[0-9]{2}):(?P<day>[0-9]{2})\s+' - r'(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$' + r"^(?P<year>[0-9]{4}):(?P<month>[0-9]{2}):(?P<day>[0-9]{2})\s+" + r"(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$" ) + class CachedMeta(dict): def __init__(self, fpath):@@ -25,15 +26,11 @@
@property def cfile(self): fname = os.path.basename(self.fpath) - if fname == 'index.md': + if fname == "index.md": fname = os.path.basename(os.path.dirname(self.fpath)) return os.path.join( - settings.tmpdir, - "%s.%s.json" % ( - fname, - self.__class__.__name__, - ) + settings.tmpdir, "%s.%s.json" % (fname, self.__class__.__name__) ) @property@@ -53,16 +50,14 @@ else:
self._cache_read() def _cache_update(self): - with open(self.cfile, 'wt') as f: + with open(self.cfile, "wt") as f: logging.debug( - "writing cached meta file of %s to %s", - self.fpath, - self.cfile + "writing cached meta file of %s to %s", self.fpath, self.cfile ) f.write(json.dumps(self, indent=4, sort_keys=True)) def _cache_read(self): - with open(self.cfile, 'rt') as f: + with open(self.cfile, "rt") as f: data = json.loads(f.read()) for k, v in data.items(): self[k] = v@@ -85,57 +80,55 @@
""" cmd = ( "exiftool", - '-sort', - '-json', - '-MIMEType', - '-FileType', - '-FileName', - '-FileSize#', - '-ModifyDate', - '-CreateDate', - '-DateTimeOriginal', - '-ImageHeight', - '-ImageWidth', - '-Aperture', - '-FOV', - '-ISO', - '-FocalLength', - '-FNumber', - '-FocalLengthIn35mmFormat', - '-ExposureTime', - '-Model', - '-GPSLongitude#', - '-GPSLatitude#', - '-LensID', - '-LensSpec', - '-Lens', - '-ReleaseDate', - '-Description', - '-Headline', - '-HierarchicalSubject', - '-Copyright', - '-Artist', - self.fpath + "-sort", + "-json", + "-MIMEType", + "-FileType", + "-FileName", + "-FileSize#", + "-ModifyDate", + "-CreateDate", + "-DateTimeOriginal", + "-ImageHeight", + "-ImageWidth", + "-Aperture", + "-FOV", + "-ISO", + "-FocalLength", + "-FNumber", + "-FocalLengthIn35mmFormat", + "-ExposureTime", + "-Model", + "-GPSLongitude#", + "-GPSLatitude#", + "-LensID", + "-LensSpec", + "-Lens", + "-ReleaseDate", + "-Description", + "-Headline", + "-HierarchicalSubject", + "-Copyright", + "-Artist", + self.fpath, ) p = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = p.communicate() if stderr: raise OSError("Error reading EXIF:\n\t%s\n\t%s", cmd, stderr) - exif = json.loads(stdout.decode('utf-8').strip()).pop() - if 'ReleaseDate' in exif and 'ReleaseTime' in exif: - exif['DateTimeRelease'] = "%s %s" % ( - exif.get('ReleaseDate'), exif.get('ReleaseTime')[:8] + exif = json.loads(stdout.decode("utf-8").strip()).pop() + if "ReleaseDate" in exif and "ReleaseTime" in exif: + exif["DateTimeRelease"] = "%s %s" % ( + exif.get("ReleaseDate"), + exif.get("ReleaseTime")[:8], ) - del(exif['ReleaseDate']) - del(exif['ReleaseTime']) + del exif["ReleaseDate"] + del exif["ReleaseTime"] for k, v in exif.items(): self[k] = self.exifdate2rfc(v)@@ -154,8 +147,8 @@ match = EXIFDATE.match(value)
if not match: return value return "%s-%s-%sT%s+00:00" % ( - match.group('year'), - match.group('month'), - match.group('day'), - match.group('time') + match.group("year"), + match.group("month"), + match.group("day"), + match.group("time"), )
@@ -41,33 +41,26 @@ import settings
from settings import struct import keys -logger = logging.getLogger('NASG') +logger = logging.getLogger("NASG") -MarkdownImage = namedtuple( - 'MarkdownImage', - ['match', 'alt', 'fname', 'title', 'css'] -) +MarkdownImage = namedtuple("MarkdownImage", ["match", "alt", "fname", "title", "css"]) J2 = jinja2.Environment( - loader=jinja2.FileSystemLoader(searchpath=settings.paths.get('tmpl')), + loader=jinja2.FileSystemLoader(searchpath=settings.paths.get("tmpl")), lstrip_blocks=True, - trim_blocks=True + trim_blocks=True, ) RE_MDIMG = re.compile( - r'(?P<match>!\[(?P<alt>[^\]]+)?\]\((?P<fname>[^\s\]]+)' - r'(?:\s[\'\"](?P<title>[^\"\']+)[\'\"])?\)(?:{(?P<css>[^\}]+)\})?)', - re.IGNORECASE + r"(?P<match>!\[(?P<alt>[^\]]+)?\]\((?P<fname>[^\s\]]+)" + r"(?:\s[\'\"](?P<title>[^\"\']+)[\'\"])?\)(?:{(?P<css>[^\}]+)\})?)", + re.IGNORECASE, ) -RE_CODE = re.compile( - r'^(?:[~`]{3,4}).+$', - re.MULTILINE -) +RE_CODE = re.compile(r"^(?:[~`]{3,4}).+$", re.MULTILINE) -RE_PRECODE = re.compile( - r'<pre class="([^"]+)"><code>' -) +RE_PRECODE = re.compile(r'<pre class="([^"]+)"><code>') + def mtime(path): """ return seconds level mtime or 0 (chomp microsecs) """@@ -78,33 +71,26 @@
def utfyamldump(data): """ dump YAML with actual UTF-8 chars """ - return yaml.dump( - data, - default_flow_style=False, - indent=4, - allow_unicode=True - ) + return yaml.dump(data, default_flow_style=False, indent=4, allow_unicode=True) def url2slug(url, limit=200): """ convert URL to max 200 char ASCII string """ - return slugify( - re.sub(r"^https?://(?:www)?", "", url), - only_ascii=True, - lower=True - )[:limit] + return slugify(re.sub(r"^https?://(?:www)?", "", url), only_ascii=True, lower=True)[ + :limit + ] -J2.filters['url2slug'] = url2slug +J2.filters["url2slug"] = url2slug def rfc3339todt(rfc3339): """ nice dates for humans """ - t = arrow.get(rfc3339).format('YYYY-MM-DD HH:mm ZZZ') + t = arrow.get(rfc3339).format("YYYY-MM-DD HH:mm ZZZ") return "%s" % (t) -J2.filters['printdate'] = rfc3339todt +J2.filters["printdate"] = rfc3339todt def extractlicense(url):@@ -112,13 +98,11 @@ """ extract license name """
n, e = os.path.splitext(os.path.basename(url)) return n.upper() -J2.filters['extractlicense'] = extractlicense + +J2.filters["extractlicense"] = extractlicense RE_MYURL = re.compile( - r'(^(%s[^"]+)$|"(%s[^"]+)")' % ( - settings.site.url, - settings.site.url - ) + r'(^(%s[^"]+)$|"(%s[^"]+)")' % (settings.site.url, settings.site.url) )@@ -134,7 +118,7 @@ else:
url = standalone r = os.path.relpath(url, baseurl) - if url.endswith('/') and not r.endswith('/'): + if url.endswith("/") and not r.endswith("/"): r = "%s/%s" % (r, settings.filenames.html) if needsquotes: r = '"%s"' % r@@ -143,21 +127,21 @@ text = text.replace(match, r)
return text -J2.filters['relurl'] = relurl +J2.filters["relurl"] = relurl def writepath(fpath, content, mtime=0): """ f.write with extras """ d = os.path.dirname(fpath) if not os.path.isdir(d): - logger.debug('creating directory tree %s', d) + logger.debug("creating directory tree %s", d) os.makedirs(d) if isinstance(content, str): - mode = 'wt' + mode = "wt" else: - mode = 'wb' + mode = "wb" with open(fpath, mode) as f: - logger.info('writing file %s', fpath) + logger.info("writing file %s", fpath) f.write(content)@@ -214,12 +198,7 @@ self.mtime = mtime
@property def fpath(self): - return os.path.join( - self.dpath, - '%s.ping' % ( - url2slug(self.target, 200) - ) - ) + return os.path.join(self.dpath, "%s.ping" % (url2slug(self.target, 200))) def check_syndication(self): """ this is very specific to webmention.io and brid.gy publish """@@ -242,7 +221,9 @@ maybe = json.loads(txt)
if "location" not in maybe: return if "http_body" not in maybe: - logger.debug("trying to re-fetch %s for %s", maybe["location"], self.fpath) + logger.debug( + "trying to re-fetch %s for %s", maybe["location"], self.fpath + ) wio = requests.get(maybe["location"]) if wio.status_code != requests.codes.ok: return@@ -260,11 +241,7 @@ with open(sp, "wt") as f:
logger.info("writing syndication copy %s to %s", url, sp) f.write(url) except Exception as e: - logger.error( - "failed to fetch syndication URL for %s: %s", - self.dpath, - e - ) + logger.error("failed to fetch syndication URL for %s: %s", self.dpath, e) pass @property@@ -283,29 +260,29 @@ async def send(self):
if self.exists: self.check_syndication() return - elif settings.args.get('noping'): - self.save("noping entry at %s" % arrow.now() ) + elif settings.args.get("noping"): + self.save("noping entry at %s" % arrow.now()) return - telegraph_url = 'https://telegraph.p3k.io/webmention' + telegraph_url = "https://telegraph.p3k.io/webmention" telegraph_params = { - 'token': '%s' % (keys.telegraph.get('token')), - 'source': '%s' % (self.source), - 'target': '%s' % (self.target) + "token": "%s" % (keys.telegraph.get("token")), + "source": "%s" % (self.source), + "target": "%s" % (self.target), } r = requests.post(telegraph_url, data=telegraph_params) logger.info( - "sent webmention to telegraph from %s to %s", - self.source, - self.target + "sent webmention to telegraph from %s to %s", self.source, self.target ) if r.status_code not in [200, 201, 202]: - logger.error('sending failed: %s %s', r.status_code, r.text) + logger.error("sending failed: %s %s", r.status_code, r.text) else: self.save(r.text) + class MarkdownDoc(object): """ Base class for anything that is stored as .md """ + @property def mtime(self): return mtime(self.fpath)@@ -314,28 +291,25 @@ @property
def dt(self): """ this returns a timestamp, not an arrow object """ maybe = self.mtime - for key in ['published', 'date']: + for key in ["published", "date"]: t = self.meta.get(key, None) - if t and 'null' != t: + if t and "null" != t: try: t = arrow.get(t) if t.timestamp > maybe: maybe = t.timestamp except Exception as e: logger.error( - 'failed to parse date: %s for key %s in %s', - t, - key, - self.fpath + "failed to parse date: %s for key %s in %s", t, key, self.fpath ) return maybe @cached_property def _parsed(self): - with open(self.fpath, mode='rt') as f: - logger.debug('parsing YAML+MD file %s', self.fpath) + with open(self.fpath, mode="rt") as f: + logger.debug("parsing YAML+MD file %s", self.fpath) meta, txt = frontmatter.parse(f.read()) - return(meta, txt) + return (meta, txt) @cached_property def meta(self):@@ -351,14 +325,11 @@ c = "%s" % (self.content)
if not len(c): return c - if hasattr(self, 'images') and len(self.images): + if hasattr(self, "images") and len(self.images): for match, img in self.images.items(): c = c.replace(match, str(img)) c = str(PandocMD2HTML(c)) - c = RE_PRECODE.sub( - '<pre><code lang="\g<1>" class="language-\g<1>">', - c - ) + c = RE_PRECODE.sub('<pre><code lang="\g<1>" class="language-\g<1>">', c) return c@@ -368,8 +339,8 @@ self.fpath = fpath
@property def dt(self): - maybe = self.meta.get('date') - if maybe and 'null' != maybe: + maybe = self.meta.get("date") + if maybe and "null" != maybe: dt = arrow.get(maybe) else: dt = arrow.get(mtime(self.fpath))@@ -377,41 +348,37 @@ return dt
@property def targetname(self): - t = urlparse(self.meta.get('target')) - return os.path.split(t.path.lstrip('/'))[0] - #t = urlparse(self.meta.get('target')) - #return t.path.rstrip('/').strip('/').split('/')[-1] + t = urlparse(self.meta.get("target")) + return os.path.split(t.path.lstrip("/"))[0] + # t = urlparse(self.meta.get('target')) + # return t.path.rstrip('/').strip('/').split('/')[-1] @property def source(self): - return self.meta.get('source') + return self.meta.get("source") @property def author(self): r = { "@context": "http://schema.org", "@type": "Person", - 'name': urlparse(self.source).hostname, - 'url': self.source + "name": urlparse(self.source).hostname, + "url": self.source, } - author = self.meta.get('author') + author = self.meta.get("author") if not author: return r - if 'name' in author: - r.update({ - 'name': self.meta.get('author').get('name') - }) - elif 'url' in author: - r.update({ - 'name': urlparse(self.meta.get('author').get('url')).hostname - }) + if "name" in author: + r.update({"name": self.meta.get("author").get("name")}) + elif "url" in author: + r.update({"name": urlparse(self.meta.get("author").get("url")).hostname}) return r @property def type(self): - return self.meta.get('type', 'webmention') + return self.meta.get("type", "webmention") # if len(self.content): - #maybe = clean(self.content, strip=True) + # maybe = clean(self.content, strip=True) # if maybe in UNICODE_EMOJI: # return maybe@@ -422,9 +389,9 @@ "@context": "http://schema.org",
"@type": "Comment", "author": self.author, "url": self.source, - "discussionUrl": self.meta.get('target'), + "discussionUrl": self.meta.get("target"), "datePublished": str(self.dt), - "disambiguatingDescription": self.type + "disambiguatingDescription": self.type, } return r@@ -440,17 +407,11 @@ self.mtime = mtime(fpath)
@property def renderdir(self): - return os.path.join( - settings.paths.get('build'), - self.source - ) + return os.path.join(settings.paths.get("build"), self.source) @property def renderfile(self): - return os.path.join( - self.renderdir, - settings.filenames.html - ) + return os.path.join(self.renderdir, settings.filenames.html) @property def source(self):@@ -463,22 +424,20 @@ return "%s.j2.html" % (self.__class__.__name__)
@property def tmplvars(self): - return { - 'source': self.source - } + return {"source": self.source} async def render(self): if os.path.exists(self.renderfile): rmtree(os.path.dirname(self.renderfile)) - #logger.info( - #'rendering %s to %s', - #self.__class__.__name__, - #self.source - #) - #r = J2.get_template(self.template).render( - #self.tmplvars - #) - #writepath(self.renderfile, r) + # logger.info( + #'rendering %s to %s', + # self.__class__.__name__, + # self.source + # ) + # r = J2.get_template(self.template).render( + # self.tmplvars + # ) + # writepath(self.renderfile, r) class Redirect(Gone):@@ -488,17 +447,15 @@ """
@cached_property def target(self): - target = '' - with open(self.fpath, 'rt') as f: + target = "" + with open(self.fpath, "rt") as f: target = f.read().strip() return target @property def tmplvars(self): - return { - 'source': self.source, - 'target': self.target - } + return {"source": self.source, "target": self.target} + class Singular(MarkdownDoc): """@@ -521,8 +478,8 @@ (ending with .md) files
""" return [ k - for k in glob.glob(os.path.join(os.path.dirname(self.fpath), '*.*')) - if not k.startswith('.') + for k in glob.glob(os.path.join(os.path.dirname(self.fpath), "*.*")) + if not k.startswith(".") ] @property@@ -533,7 +490,6 @@ for c in self.comments.values():
if c.dt > maybe: maybe = c.dt return maybe - @property def dt(self):@@ -546,13 +502,8 @@
@property def sameas(self): r = {} - for k in glob.glob( - os.path.join( - os.path.dirname(self.fpath), - '*.copy' - ) - ): - with open(k, 'rt') as f: + for k in glob.glob(os.path.join(os.path.dirname(self.fpath), "*.copy")): + with open(k, "rt") as f: r.update({f.read(): True}) return list(r.keys())@@ -565,7 +516,7 @@ """
comments = {} files = [ k - for k in glob.glob(os.path.join(os.path.dirname(self.fpath), '*.md')) + for k in glob.glob(os.path.join(os.path.dirname(self.fpath), "*.md")) if os.path.basename(k) != settings.filenames.md ] for f in files:@@ -584,24 +535,18 @@ """
images = {} for match, alt, fname, title, css in RE_MDIMG.findall(self.content): mdimg = MarkdownImage(match, alt, fname, title, css) - imgpath = os.path.join( - os.path.dirname(self.fpath), - fname - ) + imgpath = os.path.join(os.path.dirname(self.fpath), fname) if imgpath in self.files: kind = filetype.guess(imgpath) - if kind and 'image' in kind.mime.lower(): + if kind and "image" in kind.mime.lower(): images.update({match: WebImage(imgpath, mdimg, self)}) else: - logger.error("Missing image: %s, referenced in %s", - imgpath, - self.fpath - ) + logger.error("Missing image: %s, referenced in %s", imgpath, self.fpath) return images @property def is_page(self): - if self.category.startswith('_'): + if self.category.startswith("_"): return True return False@@ -637,7 +582,7 @@ return next(iter(self.images.values()))
@property def summary(self): - return self.meta.get('summary', '') + return self.meta.get("summary", "") @cached_property def html_summary(self):@@ -656,35 +601,27 @@ @property
def title(self): if self.is_reply: return "RE: %s" % self.is_reply - return self.meta.get( - 'title', - self.published.format(settings.displaydate) - ) + return self.meta.get("title", self.published.format(settings.displaydate)) @property def tags(self): - return self.meta.get('tags', []) + return self.meta.get("tags", []) @property def syndicate(self): - urls = self.meta.get('syndicate', []) + urls = self.meta.get("syndicate", []) urls.append("https://fed.brid.gy/") if self.is_photo: urls.append("https://brid.gy/publish/flickr") return urls - def baseN(self, num, b=36, - numerals="0123456789abcdefghijklmnopqrstuvwxyz"): + def baseN(self, num, b=36, numerals="0123456789abcdefghijklmnopqrstuvwxyz"): """ Creates short, lowercase slug for a number (an epoch) passed """ num = int(num) return ((num == 0) and numerals[0]) or ( - self.baseN( - num // b, - b, - numerals - ).lstrip(numerals[0]) + numerals[num % b] + self.baseN(num // b, b, numerals).lstrip(numerals[0]) + numerals[num % b] ) @property@@ -696,16 +633,16 @@ def published(self):
# ok, so here's a hack: because I have no idea when my older photos # were actually published, any photo from before 2014 will have # the EXIF createdate as publish date - pub = arrow.get(self.meta.get('published')) + pub = arrow.get(self.meta.get("published")) if self.is_photo: - maybe = arrow.get(self.photo.exif.get('CreateDate')) + maybe = arrow.get(self.photo.exif.get("CreateDate")) if maybe.year < settings.photo.earlyyears: pub = maybe return pub @property def is_reply(self): - return self.meta.get('in-reply-to', False) + return self.meta.get("in-reply-to", False) @property def is_future(self):@@ -718,55 +655,45 @@ def to_ping(self):
urls = [] if not self.is_page and self.is_front: w = Webmention( - self.url, - 'https://fed.brid.gy/', - os.path.dirname(self.fpath), - self.dt + self.url, "https://fed.brid.gy/", os.path.dirname(self.fpath), self.dt ) urls.append(w) if self.is_reply: w = Webmention( - self.url, - self.is_reply, - os.path.dirname(self.fpath), - self.dt + self.url, self.is_reply, os.path.dirname(self.fpath), self.dt ) urls.append(w) elif self.is_photo: w = Webmention( self.url, - 'https://brid.gy/publish/flickr', + "https://brid.gy/publish/flickr", os.path.dirname(self.fpath), - self.dt + self.dt, ) urls.append(w) return urls @property def licence(self): - k = '_default' + k = "_default" if self.category in settings.licence: k = self.category return settings.licence[k] @property def lang(self): - lang = 'en' + lang = "en" try: - lang = langdetect.detect("\n".join([ - self.meta.get('title', ''), - self.content - ])) + lang = langdetect.detect( + "\n".join([self.meta.get("title", ""), self.content]) + ) except BaseException: pass return lang @property def url(self): - return "%s/%s/" % ( - settings.site.get('url'), - self.name - ) + return "%s/%s/" % (settings.site.get("url"), self.name) @property def has_code(self):@@ -777,10 +704,10 @@ return False
@cached_property def review(self): - if 'review' not in self.meta: + if "review" not in self.meta: return False - review = self.meta.get('review') - rated, outof = review.get('rating').split('/') + review = self.meta.get("review") + rated, outof = review.get("rating").split("/") r = { "@context": "https://schema.org/", "@type": "Review",@@ -789,32 +716,32 @@ "@type": "Rating",
"@context": "http://schema.org", "ratingValue": rated, "bestRating": outof, - "worstRating": 1 + "worstRating": 1, }, - "name": review.get('title'), - "text": review.get('summary'), - "url": review.get('url'), + "name": review.get("title"), + "text": review.get("summary"), + "url": review.get("url"), "author": settings.author, } return r @cached_property def event(self): - if 'event' not in self.meta: + if "event" not in self.meta: return False - event = self.meta.get('event', {}) + event = self.meta.get("event", {}) r = { "@context": "http://schema.org", "@type": "Event", - "endDate": str(arrow.get(event.get('end'))), - "startDate": str(arrow.get(event.get('start'))), + "endDate": str(arrow.get(event.get("end"))), + "startDate": str(arrow.get(event.get("start"))), "location": { "@context": "http://schema.org", "@type": "Place", - "address": event.get('location'), - "name": event.get('location'), + "address": event.get("location"), + "name": event.get("location"), }, - "name": self.title + "name": self.title, } return r@@ -831,7 +758,7 @@ "genre": self.category,
"mainEntityOfPage": "%s#article" % (self.url), "dateModified": str(arrow.get(self.dt)), "datePublished": str(self.published), - "copyrightYear": str(self.published.format('YYYY')), + "copyrightYear": str(self.published.format("YYYY")), "license": "https://spdx.org/licenses/%s.html" % (self.licence), "image": settings.site.image, "author": settings.author,@@ -843,40 +770,40 @@ "description": self.html_summary,
"potentialAction": [], "comment": [], "commentCount": len(self.comments.keys()), - "keywords": self.tags + "keywords": self.tags, } if self.is_photo: - r.update({ - "@type": "Photograph", - #"image": self.photo.jsonld, - }) + r.update( + { + "@type": "Photograph", + # "image": self.photo.jsonld, + } + ) elif self.has_code: - r.update({ - "@type": "TechArticle", - }) + r.update({"@type": "TechArticle"}) elif self.is_page: - r.update({ - "@type": "WebPage", - }) + r.update({"@type": "WebPage"}) if len(self.images): r["image"] = [] for img in list(self.images.values()): r["image"].append(img.jsonld) # if not self.is_photo and len(self.images): - # img = list(self.images.values())[0] - # r.update({ - # "image": img.jsonld, - # }) + # img = list(self.images.values())[0] + # r.update({ + # "image": img.jsonld, + # }) if self.is_reply: - r.update({ - "mentions": { - "@context": "http://schema.org", - "@type": "Thing", - "url": self.is_reply + r.update( + { + "mentions": { + "@context": "http://schema.org", + "@type": "Thing", + "url": self.is_reply, + } } - }) + ) if self.review: r.update({"review": self.review})@@ -884,15 +811,13 @@
if self.event: r.update({"subjectOf": self.event}) - #for donation in settings.donateActions: - #r["potentialAction"].append(donation) + # for donation in settings.donateActions: + # r["potentialAction"].append(donation) for url in list(set(self.syndicate)): - r["potentialAction"].append({ - "@context": "http://schema.org", - "@type": "InteractAction", - "url": url - }) + r["potentialAction"].append( + {"@context": "http://schema.org", "@type": "InteractAction", "url": url} + ) for mtime in sorted(self.comments.keys()): r["comment"].append(self.comments[mtime].jsonld)@@ -909,24 +834,15 @@ return "%s.j2.txt" % (self.__class__.__name__)
@property def renderdir(self): - return os.path.join( - settings.paths.get('build'), - self.name - ) + return os.path.join(settings.paths.get("build"), self.name) @property def renderfile(self): - return os.path.join( - self.renderdir, - settings.filenames.html - ) + return os.path.join(self.renderdir, settings.filenames.html) @property def mementofile(self): - return os.path.join( - os.path.dirname(self.fpath), - settings.filenames.memento - ) + return os.path.join(os.path.dirname(self.fpath), settings.filenames.memento) @property def has_memento(self):@@ -937,34 +853,26 @@ return False
@property def gopherfile(self): - return os.path.join( - self.renderdir, - settings.filenames.txt - ) + return os.path.join(self.renderdir, settings.filenames.txt) @property def exists(self): - if settings.args.get('force'): - logger.debug('rendering required: force mode on') + if settings.args.get("force"): + logger.debug("rendering required: force mode on") return False elif not os.path.exists(self.renderfile): - logger.debug('rendering required: no html yet') + logger.debug("rendering required: no html yet") return False elif self.dt > mtime(self.renderfile): - logger.debug('rendering required: self.dt > html mtime') + logger.debug("rendering required: self.dt > html mtime") return False else: - logger.debug('rendering not required') + logger.debug("rendering not required") return True @property def corpus(self): - return "\n".join([ - self.title, - self.name, - self.summary, - self.content, - ]) + return "\n".join([self.title, self.name, self.summary, self.content]) @cached_property def oembed_xml(self):@@ -974,66 +882,48 @@ for k, v in self.oembed_json.items():
x = etree.SubElement(oembed, k) t = "%s" % (v) if "html" == k: - x.text = etree.CDATA(t) + x.text = etree.CDATA(t) else: x.text = t s = etree.tostring( - xmldoc, - encoding='utf-8', - xml_declaration=True, - pretty_print=True + xmldoc, encoding="utf-8", xml_declaration=True, pretty_print=True ) return s @cached_property def oembed_json(self): r = { - "version": "1.0", - "provider_name": settings.site.name, - "provider_url": settings.site.url, - "author_name": settings.author.name, - "author_url": settings.author.url, - "title": self.title, - "type": "link" + "version": "1.0", + "provider_name": settings.site.name, + "provider_url": settings.site.url, + "author_name": settings.author.name, + "author_url": settings.author.url, + "title": self.title, + "type": "link", } if self.is_photo: - r.update({ - "type": "photo", - "url": self.photo.jsonld.thumbnail.url, - "width": self.photo.jsonld.thumbnail.width, - "height": self.photo.jsonld.thumbnail.height - }) + r.update( + { + "type": "photo", + "url": self.photo.jsonld.thumbnail.url, + "width": self.photo.jsonld.thumbnail.width, + "height": self.photo.jsonld.thumbnail.height, + } + ) return r async def copyfiles(self): - exclude = [ - '.md', - '.jpg', - '.png', - '.gif', - '.ping', - '.url', - '.del', - '.copy' - ] - files = glob.glob( - os.path.join( - os.path.dirname(self.fpath), - '*.*' - ) - ) + exclude = [".md", ".jpg", ".png", ".gif", ".ping", ".url", ".del", ".copy"] + files = glob.glob(os.path.join(os.path.dirname(self.fpath), "*.*")) for f in files: fname, fext = os.path.splitext(f) if fext.lower() in exclude: continue t = os.path.join( - settings.paths.get('build'), - self.name, - os.path.basename(f) + settings.paths.get("build"), self.name, os.path.basename(f) ) - if os.path.exists(t) and mtime( - f) <= mtime(t): + if os.path.exists(t) and mtime(f) <= mtime(t): continue logger.info("copying '%s' to '%s'", f, t) cp(f, t)@@ -1043,31 +933,30 @@ cp(self.renderfile, self.mementofile)
return async def save_to_archiveorg(self): - requests.get('http://web.archive.org/save/%s' % (self.url)) + requests.get("http://web.archive.org/save/%s" % (self.url)) def try_memento(self, url): try: params = { - 'url': url, - 'timestamp': '%s' % (self.published.format('YYYY-MM-DD')) + "url": url, + "timestamp": "%s" % (self.published.format("YYYY-MM-DD")), } - waybackmachine = 'http://archive.org/wayback/available' + waybackmachine = "http://archive.org/wayback/available" snapshots = requests.get(waybackmachine, params=params).json() # no archived version... - if not len(snapshots.get('archived_snapshots', None)): - logger.warning('no snapshot found for %s', url) + if not len(snapshots.get("archived_snapshots", None)): + logger.warning("no snapshot found for %s", url) return None else: - logger.info('snapshot FOUND for %s', url) + logger.info("snapshot FOUND for %s", url) - snapshot = snapshots.get('archived_snapshots').get('closest') - logger.info('getting %s', snapshot['url']) - original = requests.get(snapshot['url']) + snapshot = snapshots.get("archived_snapshots").get("closest") + logger.info("getting %s", snapshot["url"]) + original = requests.get(snapshot["url"]) return original.text except Exception as e: - logger.warning('wayback memento failed for %s: %s', url, e) + logger.warning("wayback memento failed for %s: %s", url, e) return None - def maybe_fetch_memento(self): if self.has_memento:@@ -1077,52 +966,47 @@ # this commented out part is extremely specific to my old site
# but it helps anyone who had multiple domains and/or taxonomy # structures # formerdomains = [ - # 'cadeyrn.webporfolio.hu', - # 'blog.petermolnar.eu', - # 'petermolnar.eu', - # 'petermolnar.net', + # 'cadeyrn.webporfolio.hu', + # 'blog.petermolnar.eu', + # 'petermolnar.eu', + # 'petermolnar.net', # ] # formercategories = [ - # 'linux-tech-coding', - # 'diy-do-it-yourself', - # 'photoblog', - # 'it', - # 'sysadmin-blog', - # 'sysadmin', - # 'fotography', - # 'blips', - # 'blog', - # 'r' + # 'linux-tech-coding', + # 'diy-do-it-yourself', + # 'photoblog', + # 'it', + # 'sysadmin-blog', + # 'sysadmin', + # 'fotography', + # 'blips', + # 'blog', + # 'r' # ] # for domain in formerdomains: - # maybe = None - # url = url = 'http://%s/%s/' % (domain, self.name) - # maybe = self.try_memento(url) - # if maybe: - # break - # for formercategory in formercategories: - # url = 'http://%s/%s/%s/' % (domain, formercategory, self.name) - # maybe = self.try_memento(url) - # if maybe: - # break - # if maybe: - # break + # maybe = None + # url = url = 'http://%s/%s/' % (domain, self.name) + # maybe = self.try_memento(url) + # if maybe: + # break + # for formercategory in formercategories: + # url = 'http://%s/%s/%s/' % (domain, formercategory, self.name) + # maybe = self.try_memento(url) + # if maybe: + # break + # if maybe: + # break maybe = self.try_memento(self.url) if maybe: - with open(self.mementofile, 'wt') as f: - logger.info( - 'saving memento for %s to %s', - self.name, - self.mementofile - ) + with open(self.mementofile, "wt") as f: + logger.info("saving memento for %s to %s", self.name, self.mementofile) f.write(maybe) - async def render(self): - if settings.args.get('memento'): + if settings.args.get("memento"): self.maybe_fetch_memento() if self.exists:@@ -1134,53 +1018,46 @@ memento = "%s%s" % (self.url, settings.filenames.memento)
logger.info("rendering %s", self.name) v = { - 'baseurl': self.url, - 'post': self.jsonld, - 'site': settings.site, - 'menu': settings.menu, - 'meta': settings.meta, - 'fnames': settings.filenames, - 'memento': memento + "baseurl": self.url, + "post": self.jsonld, + "site": settings.site, + "menu": settings.menu, + "meta": settings.meta, + "fnames": settings.filenames, + "memento": memento, } - writepath( - self.renderfile, - J2.get_template(self.template).render(v) - ) - del(v) + writepath(self.renderfile, J2.get_template(self.template).render(v)) + del v g = { - 'post': self.jsonld, - 'summary': self.txt_summary, - 'content': self.txt_content + "post": self.jsonld, + "summary": self.txt_summary, + "content": self.txt_content, } - writepath( - self.gopherfile, - J2.get_template(self.gophertemplate).render(g) - ) - del(g) + writepath(self.gopherfile, J2.get_template(self.gophertemplate).render(g)) + del g j = settings.site.copy() - j.update({ - "mainEntity": self.jsonld - }) + j.update({"mainEntity": self.jsonld}) writepath( os.path.join(self.renderdir, settings.filenames.json), - json.dumps(j, indent=4, ensure_ascii=False) + json.dumps(j, indent=4, ensure_ascii=False), ) - del(j) + del j if not os.path.exists(self.mementofile): if self.published.timestamp >= settings.mementostartime: copy(self.renderfile, self.mementofile) # oembed # writepath( - # os.path.join(self.renderdir, settings.filenames.oembed_json), - # json.dumps(self.oembed_json, indent=4, ensure_ascii=False) + # os.path.join(self.renderdir, settings.filenames.oembed_json), + # json.dumps(self.oembed_json, indent=4, ensure_ascii=False) # ) # writepath( - # os.path.join(self.renderdir, settings.filenames.oembed_xml), - # self.oembed_xml + # os.path.join(self.renderdir, settings.filenames.oembed_xml), + # self.oembed_xml # ) + class Home(Singular): def __init__(self, fpath):@@ -1192,20 +1069,17 @@ self.posts.append((category.ctmplvars, post.jsonld))
@property def renderdir(self): - return settings.paths.get('build') + return settings.paths.get("build") @property def renderfile(self): - return os.path.join( - settings.paths.get('build'), - settings.filenames.html - ) + return os.path.join(settings.paths.get("build"), settings.filenames.html) @property def dt(self): maybe = super().dt for cat, post in self.posts: - pts = arrow.get(post['dateModified']).timestamp + pts = arrow.get(post["dateModified"]).timestamp if pts > maybe: maybe = pts return maybe@@ -1213,40 +1087,39 @@
async def render_gopher(self): lines = [ "%s's gopherhole - phlog, if you prefer" % (settings.site.name), - '', - '' + "", + "", ] for category, post in self.posts: line = "1%s\t/%s/%s\t%s\t70" % ( - category['name'], + category["name"], settings.paths.category, - category['name'], - settings.site.name + category["name"], + settings.site.name, ) lines.append(line) - lines.append('') + lines.append("") writepath( - self.renderfile.replace( - settings.filenames.html, - settings.filenames.gopher - ), - "\r\n".join(lines) + self.renderfile.replace(settings.filenames.html, settings.filenames.gopher), + "\r\n".join(lines), ) async def render(self): if self.exists: return logger.info("rendering %s", self.name) - r = J2.get_template(self.template).render({ - 'baseurl': settings.site.get('url'), - 'post': self.jsonld, - 'site': settings.site, - 'menu': settings.menu, - 'meta': settings.meta, - 'posts': self.posts, - 'fnames': settings.filenames - }) + r = J2.get_template(self.template).render( + { + "baseurl": settings.site.get("url"), + "post": self.jsonld, + "site": settings.site, + "menu": settings.menu, + "meta": settings.meta, + "posts": self.posts, + "fnames": settings.filenames, + } + ) writepath(self.renderfile, r) await self.render_gopher()@@ -1261,14 +1134,16 @@ self.mtime = mtime(self.fpath)
self.fname, self.fext = os.path.splitext(os.path.basename(fpath)) self.resized_images = [ (k, self.Resized(self, k)) - for k in settings.photo.get('sizes').keys() + for k in settings.photo.get("sizes").keys() if k < max(self.width, self.height) ] if not len(self.resized_images): - self.resized_images.append(( - max(self.width, self.height), - self.Resized(self, max(self.width, self.height)) - )) + self.resized_images.append( + ( + max(self.width, self.height), + self.Resized(self, max(self.width, self.height)), + ) + ) @property def is_mainimg(self):@@ -1283,52 +1158,58 @@ "@context": "http://schema.org",
"@type": "ImageObject", "url": self.href, "image": self.href, - "thumbnail": struct({ - "@context": "http://schema.org", - "@type": "ImageObject", - "url": self.src, - "width": self.displayed.width, - "height": self.displayed.height, - }), + "thumbnail": struct( + { + "@context": "http://schema.org", + "@type": "ImageObject", + "url": self.src, + "width": self.displayed.width, + "height": self.displayed.height, + } + ), "name": os.path.basename(self.fpath), "encodingFormat": self.mime_type, "contentSize": self.mime_size, "width": self.linked.width, "height": self.linked.height, - "dateCreated": self.exif.get('CreateDate'), + "dateCreated": self.exif.get("CreateDate"), "exifData": [], "caption": self.caption, "headline": self.title, - "representativeOfPage": False + "representativeOfPage": False, } for k, v in self.exif.items(): - r["exifData"].append({ - "@type": "PropertyValue", - "name": k, - "value": v - }) + r["exifData"].append({"@type": "PropertyValue", "name": k, "value": v}) if self.is_photo: - r.update({ - "creator": settings.author, - "copyrightHolder": settings.author, - "license": settings.licence['_default'] - }) + r.update( + { + "creator": settings.author, + "copyrightHolder": settings.author, + "license": settings.licence["_default"], + } + ) if self.is_mainimg: r.update({"representativeOfPage": True}) - if self.exif['GPSLatitude'] != 0 and self.exif['GPSLongitude'] != 0: - r.update({ - "locationCreated": struct({ - "@context": "http://schema.org", - "@type": "Place", - "geo": struct({ - "@context": "http://schema.org", - "@type": "GeoCoordinates", - "latitude": self.exif['GPSLatitude'], - "longitude": self.exif['GPSLongitude'] - }) - }) - }) + if self.exif["GPSLatitude"] != 0 and self.exif["GPSLongitude"] != 0: + r.update( + { + "locationCreated": struct( + { + "@context": "http://schema.org", + "@type": "Place", + "geo": struct( + { + "@context": "http://schema.org", + "@type": "GeoCoordinates", + "latitude": self.exif["GPSLatitude"], + "longitude": self.exif["GPSLongitude"], + } + ), + } + ) + } + ) return struct(r) def __str__(self):@@ -1346,51 +1227,49 @@ def caption(self):
if len(self.mdimg.alt): return self.mdimg.alt else: - return self.meta.get('Description', '') + return self.meta.get("Description", "") @property def title(self): if len(self.mdimg.title): return self.mdimg.title else: - return self.meta.get('Headline', self.fname) + return self.meta.get("Headline", self.fname) @property def tags(self): - return list(set(self.meta.get('Subject', []))) + return list(set(self.meta.get("Subject", []))) @property def published(self): - return arrow.get( - self.meta.get('ReleaseDate', self.meta.get('ModifyDate')) - ) + return arrow.get(self.meta.get("ReleaseDate", self.meta.get("ModifyDate"))) @property def width(self): - return int(self.meta.get('ImageWidth')) + return int(self.meta.get("ImageWidth")) @property def height(self): - return int(self.meta.get('ImageHeight')) + return int(self.meta.get("ImageHeight")) @property def mime_type(self): - return str(self.meta.get('MIMEType', 'image/jpeg')) + return str(self.meta.get("MIMEType", "image/jpeg")) @property def mime_size(self): try: size = os.path.getsize(self.linked.fpath) except Exception as e: - logger.error('Failed to get mime size of %s', self.linked.fpath) - size = self.meta.get('FileSize', 0) + logger.error("Failed to get mime size of %s", self.linked.fpath) + size = self.meta.get("FileSize", 0) return size @property def displayed(self): ret = self.resized_images[0][1] for size, r in self.resized_images: - if size == settings.photo.get('default'): + if size == settings.photo.get("default"): ret = r return ret@@ -1414,11 +1293,11 @@ return self.linked.url
@property def is_photo(self): - r = settings.photo.get('re_author', None) + r = settings.photo.get("re_author", None) if not r: return False - cpr = self.meta.get('Copyright', '') - art = self.meta.get('Artist', '') + cpr = self.meta.get("Copyright", "") + art = self.meta.get("Artist", "") # both Artist and Copyright missing from EXIF if not cpr and not art: return False@@ -1430,29 +1309,29 @@
@property def exif(self): exif = { - 'Model': '', - 'FNumber': '', - 'ExposureTime': '', - 'FocalLength': '', - 'ISO': '', - 'LensID': '', - 'CreateDate': str(arrow.get(self.mtime)), - 'GPSLatitude': 0, - 'GPSLongitude': 0 + "Model": "", + "FNumber": "", + "ExposureTime": "", + "FocalLength": "", + "ISO": "", + "LensID": "", + "CreateDate": str(arrow.get(self.mtime)), + "GPSLatitude": 0, + "GPSLongitude": 0, } if not self.is_photo: return exif mapping = { - 'Model': ['Model'], - 'FNumber': ['FNumber', 'Aperture'], - 'ExposureTime': ['ExposureTime'], - 'FocalLength': ['FocalLength'], # ['FocalLengthIn35mmFormat'], - 'ISO': ['ISO'], - 'LensID': ['LensID', 'LensSpec', 'Lens'], - 'CreateDate': ['CreateDate', 'DateTimeOriginal'], - 'GPSLatitude': ['GPSLatitude'], - 'GPSLongitude': ['GPSLongitude'] + "Model": ["Model"], + "FNumber": ["FNumber", "Aperture"], + "ExposureTime": ["ExposureTime"], + "FocalLength": ["FocalLength"], # ['FocalLengthIn35mmFormat'], + "ISO": ["ISO"], + "LensID": ["LensID", "LensSpec", "Lens"], + "CreateDate": ["CreateDate", "DateTimeOriginal"], + "GPSLatitude": ["GPSLatitude"], + "GPSLongitude": ["GPSLongitude"], } for ekey, candidates in mapping.items():@@ -1469,7 +1348,7 @@ def _maybe_watermark(self, img):
if not self.is_photo: return img - wmarkfile = settings.paths.get('watermark') + wmarkfile = settings.paths.get("watermark") if not os.path.exists(wmarkfile): return img@@ -1497,7 +1376,7 @@
async def downsize(self): need = False for size, resized in self.resized_images: - if not resized.exists or settings.args.get('regenerate'): + if not resized.exists or settings.args.get("regenerate"): need = True break if not need:@@ -1507,11 +1386,11 @@ with wand.image.Image(filename=self.fpath) as img:
img.auto_orient() img = self._maybe_watermark(img) for size, resized in self.resized_images: - if not resized.exists or settings.args.get('regenerate'): + if not resized.exists or settings.args.get("regenerate"): logger.info( "resizing image: %s to size %d", os.path.basename(self.fpath), - size + size, ) await resized.make(img)@@ -1523,49 +1402,38 @@ self.crop = crop
@property def data(self): - with open(self.fpath, 'rb') as f: + with open(self.fpath, "rb") as f: encoded = base64.b64encode(f.read()) return "data:%s;base64,%s" % ( - self.parent.mime_type, encoded.decode('utf-8')) + self.parent.mime_type, + encoded.decode("utf-8"), + ) @property def suffix(self): - return settings.photo.get('sizes').get(self.size, '') + return settings.photo.get("sizes").get(self.size, "") @property def fname(self): - return "%s%s%s" % ( - self.parent.fname, - self.suffix, - self.parent.fext - ) + return "%s%s%s" % (self.parent.fname, self.suffix, self.parent.fext) @property def fpath(self): - return os.path.join( - self.parent.parent.renderdir, - self.fname - ) + return os.path.join(self.parent.parent.renderdir, self.fname) @property def url(self): return "%s/%s/%s" % ( - settings.site.get('url'), + settings.site.get("url"), self.parent.parent.name, - "%s%s%s" % ( - self.parent.fname, - self.suffix, - self.parent.fext - ) + "%s%s%s" % (self.parent.fname, self.suffix, self.parent.fext), ) @property def relpath(self): return "%s/%s" % ( - self.parent.parent.renderdir.replace( - settings.paths.get('build'), '' - ), - self.fname + self.parent.parent.renderdir.replace(settings.paths.get("build"), ""), + self.fname, ) @property@@ -1601,8 +1469,7 @@ if ratio > 2.4 and not self.crop:
size = int(size * 0.6) horizontal = not horizontal - if (horizontal and not self.crop) \ - or (not horizontal and self.crop): + if (horizontal and not self.crop) or (not horizontal and self.crop): w = size h = int(float(size / width) * height) else:@@ -1620,35 +1487,29 @@
if self.crop: thumb.liquid_rescale(self.size, self.size, 1, 1) - if self.parent.meta.get('FileType', 'jpeg').lower() == 'jpeg': + if self.parent.meta.get("FileType", "jpeg").lower() == "jpeg": thumb.compression_quality = 88 - thumb.unsharp_mask( - radius=1, - sigma=0.5, - amount=0.7, - threshold=0.5 - ) - thumb.format = 'pjpeg' + thumb.unsharp_mask(radius=1, sigma=0.5, amount=0.7, threshold=0.5) + thumb.format = "pjpeg" # this is to make sure pjpeg happens - with open(self.fpath, 'wb') as f: + with open(self.fpath, "wb") as f: logger.info("writing %s", self.fpath) thumb.save(file=f) # n, e = os.path.splitext(os.path.basename(self.fpath)) # webppath = self.fpath.replace(e, '.webp') # with open(webppath, 'wb') as f: - # logger.info("writing %s", webppath) - # thumb.format = 'webp' - # thumb.compression_quality = 88 - # thumb.save(file=f) - + # logger.info("writing %s", webppath) + # thumb.format = 'webp' + # thumb.compression_quality = 88 + # thumb.save(file=f) class PHPFile(object): @property def exists(self): - if settings.args.get('force'): + if settings.args.get("force"): return False if not os.path.exists(self.renderfile): return False@@ -1658,41 +1519,34 @@ return True
@property def mtime(self): - return mtime( - os.path.join( - settings.paths.get('tmpl'), - self.templatefile - ) - ) + return mtime(os.path.join(settings.paths.get("tmpl"), self.templatefile)) @property def renderfile(self): - raise ValueError('Not implemented') + raise ValueError("Not implemented") @property def templatefile(self): - raise ValueError('Not implemented') + raise ValueError("Not implemented") async def render(self): # if self.exists: - # return + # return await self._render() class Search(PHPFile): def __init__(self): - self.fpath = os.path.join( - settings.paths.get('build'), - 'search.sqlite' - ) + self.fpath = os.path.join(settings.paths.get("build"), "search.sqlite") self.db = sqlite3.connect(self.fpath) - self.db.execute('PRAGMA auto_vacuum = INCREMENTAL;') - self.db.execute('PRAGMA journal_mode = MEMORY;') - self.db.execute('PRAGMA temp_store = MEMORY;') - self.db.execute('PRAGMA locking_mode = NORMAL;') - self.db.execute('PRAGMA synchronous = FULL;') + self.db.execute("PRAGMA auto_vacuum = INCREMENTAL;") + self.db.execute("PRAGMA journal_mode = MEMORY;") + self.db.execute("PRAGMA temp_store = MEMORY;") + self.db.execute("PRAGMA locking_mode = NORMAL;") + self.db.execute("PRAGMA synchronous = FULL;") self.db.execute('PRAGMA encoding = "UTF-8";') - self.db.execute(''' + self.db.execute( + """ CREATE VIRTUAL TABLE IF NOT EXISTS data USING fts4( url, mtime,@@ -1704,26 +1558,29 @@ notindexed=category,
notindexed=url, notindexed=mtime, tokenize=porter - )''' - ) + )""" + ) self.is_changed = False def __exit__(self): if self.is_changed: self.db.commit() - self.db.execute('PRAGMA auto_vacuum;') + self.db.execute("PRAGMA auto_vacuum;") self.db.close() def check(self, name): ret = 0 - maybe = self.db.execute(''' + maybe = self.db.execute( + """ SELECT mtime FROM data WHERE name = ? - ''', (name,)).fetchone() + """, + (name,), + ).fetchone() if maybe: ret = int(maybe[0]) return ret@@ -1731,46 +1588,46 @@
def append(self, post): mtime = int(post.published.timestamp) check = self.check(post.name) - if (check and check < mtime): - self.db.execute(''' + if check and check < mtime: + self.db.execute( + """ DELETE FROM data WHERE - name=?''', (post.name,)) + name=?""", + (post.name,), + ) check = False if not check: - self.db.execute(''' + self.db.execute( + """ INSERT INTO data (url, mtime, name, title, category, content) VALUES (?,?,?,?,?,?); - ''', ( - post.url, - mtime, - post.name, - post.title, - post.category, - post.content - )) + """, + (post.url, mtime, post.name, post.title, post.category, post.content), + ) self.is_changed = True @property def templates(self): - return ['Search.j2.php', 'OpenSearch.j2.xml'] + return ["Search.j2.php", "OpenSearch.j2.xml"] async def _render(self): for template in self.templates: - r = J2.get_template(template).render({ - 'post': {}, - 'site': settings.site, - 'menu': settings.menu, - 'meta': settings.meta, - }) + r = J2.get_template(template).render( + { + "post": {}, + "site": settings.site, + "menu": settings.menu, + "meta": settings.meta, + } + ) target = os.path.join( - settings.paths.get('build'), - template.replace('.j2', '').lower() + settings.paths.get("build"), template.replace(".j2", "").lower() ) writepath(target, r)@@ -1787,86 +1644,73 @@ def add_redirect(self, source, target):
if target in self.gone: self.add_gone(source) else: - if '://' not in target: - target = "%s/%s" % (settings.site.get('url'), target) + if "://" not in target: + target = "%s/%s" % (settings.site.get("url"), target) self.redirect[source] = target @property def renderfile(self): - return os.path.join( - settings.paths.get('build'), - 'index.php' - ) + return os.path.join(settings.paths.get("build"), "index.php") @property def templatefile(self): - return '404.j2.php' + return "404.j2.php" async def _render(self): - r = J2.get_template(self.templatefile).render({ - 'post': {}, - 'site': settings.site, - 'menu': settings.menu, - 'gones': self.gone, - 'redirects': self.redirect - }) + r = J2.get_template(self.templatefile).render( + { + "post": {}, + "site": settings.site, + "menu": settings.menu, + "gones": self.gone, + "redirects": self.redirect, + } + ) writepath(self.renderfile, r) class WebhookPHP(PHPFile): @property def renderfile(self): - return os.path.join( - settings.paths.get('build'), - 'webhook.php' - ) + return os.path.join(settings.paths.get("build"), "webhook.php") @property def templatefile(self): - return 'Webhook.j2.php' + return "Webhook.j2.php" async def _render(self): - r = J2.get_template(self.templatefile).render({ - 'author': settings.author, - 'webmentionio': keys.webmentionio - }) + r = J2.get_template(self.templatefile).render( + {"author": settings.author, "webmentionio": keys.webmentionio} + ) writepath(self.renderfile, r) class MicropubPHP(PHPFile): @property def renderfile(self): - return os.path.join( - settings.paths.get('build'), - 'micropub.php' - ) + return os.path.join(settings.paths.get("build"), "micropub.php") @property def templatefile(self): - return 'Micropub.j2.php' + return "Micropub.j2.php" async def _render(self): - r = J2.get_template(self.templatefile).render({ - 'site': settings.site, - 'menu': settings.menu, - 'paths': settings.paths - }) + r = J2.get_template(self.templatefile).render( + {"site": settings.site, "menu": settings.menu, "paths": settings.paths} + ) writepath(self.renderfile, r) class Category(dict): - def __init__(self, name=''): + def __init__(self, name=""): self.name = name - self.trange = 'YYYY' + self.trange = "YYYY" def __setitem__(self, key, value): if key in self: raise LookupError( - "key '%s' already exists, colliding posts are: %s vs %s" % ( - key, - self[key].fpath, - value.fpath, - ) + "key '%s' already exists, colliding posts are: %s vs %s" + % (key, self[key].fpath, value.fpath) ) dict.__setitem__(self, key, value)@@ -1899,7 +1743,7 @@ def url(self):
if len(self.name): url = "%s/%s/%s/" % (settings.site.url, settings.paths.category, self.name) else: - url = '%s/' % (settings.site.url) + url = "%s/" % (settings.site.url) return url @property@@ -1914,9 +1758,7 @@ @property
def dpath(self): if len(self.name): return os.path.join( - settings.paths.build, - settings.paths.category, - self.name + settings.paths.build, settings.paths.category, self.name ) else: return settings.paths.build@@ -1935,33 +1777,24 @@ if y == self.newest_year:
url = self.url else: url = "%s%d/" % (self.url, y) - years.update({ - y: url - }) + years.update({y: url}) return years @property def mtime(self): - if len (self.sortedkeys) > 0: + if len(self.sortedkeys) > 0: return self[self.sortedkeys[0]].published.timestamp else: return 0 def feedpath(self, fname): - return os.path.join( - self.dpath, - settings.paths.feed, - fname - ) + return os.path.join(self.dpath, settings.paths.feed, fname) def get_posts(self, start=0, end=-1): - return [ - self[k].jsonld - for k in self.sortedkeys[start:end] - ] + return [self[k].jsonld for k in self.sortedkeys[start:end]] def is_uptodate(self, fpath, ts): - if settings.args.get('force'): + if settings.args.get("force"): return False if not os.path.exists(fpath): return False@@ -1972,69 +1805,55 @@
def newest(self, start=0, end=-1): if start == end: end = -1 - s = sorted( - [self[k].dt for k in self.sortedkeys[start:end]], - reverse=True - ) + s = sorted([self[k].dt for k in self.sortedkeys[start:end]], reverse=True) if len(s) > 0: - return s[0] # Timestamp in seconds since epoch + return s[0] # Timestamp in seconds since epoch else: return 0 @property def ctmplvars(self): return { - 'name': self.name, - 'url': self.url, - 'feed': self.feedurl, - 'title': self.title, + "name": self.name, + "url": self.url, + "feed": self.feedurl, + "title": self.title, } def tmplvars(self, posts=[], year=None): baseurl = self.url if year: - baseurl = '%s%s/' % (baseurl, year) + baseurl = "%s%s/" % (baseurl, year) return { - 'baseurl': baseurl, - 'site': settings.site, - 'menu': settings.menu, - 'meta': settings.meta, - 'category': { - 'name': self.name, - 'paginated': self.is_paginated, - 'url': self.url, - 'feed': self.feedurl, - 'title': self.title, - 'year': year, - 'years': self.years, + "baseurl": baseurl, + "site": settings.site, + "menu": settings.menu, + "meta": settings.meta, + "category": { + "name": self.name, + "paginated": self.is_paginated, + "url": self.url, + "feed": self.feedurl, + "title": self.title, + "year": year, + "years": self.years, }, - 'posts': posts, - 'fnames': settings.filenames + "posts": posts, + "fnames": settings.filenames, } def indexfpath(self, subpath=None, fname=settings.filenames.html): if subpath: - return os.path.join( - self.dpath, - subpath, - fname - ) + return os.path.join(self.dpath, subpath, fname) else: - return os.path.join( - self.dpath, - fname - ) + return os.path.join(self.dpath, fname) async def render_feed(self, xmlformat): - if 'json' == xmlformat: + if "json" == xmlformat: await self.render_json() return - logger.info( - 'rendering category "%s" %s feed', - self.name, - xmlformat - ) + logger.info('rendering category "%s" %s feed', self.name, xmlformat) start = 0 end = int(settings.pagination)@@ -2042,12 +1861,9 @@
fg = FeedGenerator() fg.id(self.feedurl) fg.title(self.title) - fg.author({ - 'name': settings.author.name, - 'email': settings.author.email - }) - fg.logo('%s/favicon.png' % settings.site.url) - fg.updated(arrow.get(self.mtime).to('utc').datetime) + fg.author({"name": settings.author.name, "email": settings.author.email}) + fg.logo("%s/favicon.png" % settings.site.url) + fg.updated(arrow.get(self.mtime).to("utc").datetime) fg.description(settings.site.headline) for k in reversed(self.sortedkeys[start:end]):@@ -2056,57 +1872,52 @@ fe = fg.add_entry()
fe.id(post.url) fe.title(post.title) - fe.author({ - 'name': settings.author.name, - 'email': settings.author.email - }) - fe.category({ - 'term': post.category, - 'label': post.category, - 'scheme': "%s/%s/%s/" % ( - settings.site.url, - settings.paths.category, - post.category - ) - }) + fe.author({"name": settings.author.name, "email": settings.author.email}) + fe.category( + { + "term": post.category, + "label": post.category, + "scheme": "%s/%s/%s/" + % (settings.site.url, settings.paths.category, post.category), + } + ) fe.published(post.published.datetime) fe.updated(arrow.get(post.dt).datetime) - fe.rights('%s %s %s' % ( - post.licence.upper(), - settings.author.name, - post.published.format('YYYY') - )) + fe.rights( + "%s %s %s" + % ( + post.licence.upper(), + settings.author.name, + post.published.format("YYYY"), + ) + ) - if xmlformat == 'rss': + if xmlformat == "rss": fe.link(href=post.url) - fe.content(post.html_content, type='CDATA') + fe.content(post.html_content, type="CDATA") if post.is_photo: fe.enclosure( post.photo.href, "%d" % post.photo.mime_size, post.photo.mime_type, ) - elif xmlformat == 'atom': - fe.link( - href=post.url, - rel='alternate', - type='text/html' - ) - fe.content(src=post.url, type='text/html') + elif xmlformat == "atom": + fe.link(href=post.url, rel="alternate", type="text/html") + fe.content(src=post.url, type="text/html") fe.summary(post.summary) - if xmlformat == 'rss': + if xmlformat == "rss": fg.link(href=self.feedurl) writepath(self.feedpath(settings.filenames.rss), fg.rss_str(pretty=True)) - elif xmlformat == 'atom': - fg.link(href=self.feedurl, rel='self') - fg.link(href=settings.meta.get('hub'), rel='hub') + elif xmlformat == "atom": + fg.link(href=self.feedurl, rel="self") + fg.link(href=settings.meta.get("hub"), rel="hub") writepath(self.feedpath(settings.filenames.atom), fg.atom_str(pretty=True)) async def render_json(self): - logger.info('rendering category "%s" JSON feed',self.name) + logger.info('rendering category "%s" JSON feed', self.name) js = { "version": "https://jsonfeed.org/version/1",@@ -2118,10 +1929,10 @@ "name": settings.author.name,
"url": settings.author.url, "avatar": settings.author.image, }, - "items": [] + "items": [], } - for k in reversed(self.sortedkeys[0:int(settings.pagination)]): + for k in reversed(self.sortedkeys[0 : int(settings.pagination)]): post = self[k] pjs = { "id": post.url,@@ -2133,54 +1944,49 @@ }
if len(post.summary): pjs.update({"summary": post.txt_summary}) if post.is_photo: - pjs.update({"attachment": { - "url": post.photo.href, - "mime_type": post.photo.mime_type, - "size_in_bytes": "%d" % post.photo.mime_size - }}) + pjs.update( + { + "attachment": { + "url": post.photo.href, + "mime_type": post.photo.mime_type, + "size_in_bytes": "%d" % post.photo.mime_size, + } + } + ) js["items"].append(pjs) writepath( self.feedpath(settings.filenames.json), - json.dumps(js, indent=4, ensure_ascii=False) + json.dumps(js, indent=4, ensure_ascii=False), ) async def render_flat(self): - logger.info('rendering flat archive for %s', self.name) - r = J2.get_template(self.template).render( - self.tmplvars(self.get_posts()) - ) + logger.info("rendering flat archive for %s", self.name) + r = J2.get_template(self.template).render(self.tmplvars(self.get_posts())) writepath(self.indexfpath(), r) async def render_gopher(self): - lines = [ - '%s - %s' % (self.name, settings.site.name), - '', - '' - ] + lines = ["%s - %s" % (self.name, settings.site.name), "", ""] for post in self.get_posts(): line = "0%s\t/%s/%s\t%s\t70" % ( post.headline, post.name, settings.filenames.txt, - settings.site.name + settings.site.name, ) lines.append(line) - if (len(post.description)): + if len(post.description): lines.extend(str(PandocHTML2TXT(post.description)).split("\n")) - if isinstance(post['image'], list): - for img in post['image']: + if isinstance(post["image"], list): + for img in post["image"]: line = "I%s\t/%s/%s\t%s\t70" % ( img.headline, post.name, img.name, - settings.site.name + settings.site.name, ) lines.append(line) - lines.append('') - writepath( - self.indexfpath(fname=settings.filenames.gopher), - "\r\n".join(lines) - ) + lines.append("") + writepath(self.indexfpath(fname=settings.filenames.gopher), "\r\n".join(lines)) async def render_archives(self): for year in self.years.keys():@@ -2190,9 +1996,9 @@ tyear = None
else: fpath = self.indexfpath("%d" % (year)) tyear = year - y = arrow.get("%d" % year, self.trange).to('utc') - tsmin = y.floor('year').timestamp - tsmax = y.ceil('year').timestamp + y = arrow.get("%d" % year, self.trange).to("utc") + tsmin = y.floor("year").timestamp + tsmax = y.ceil("year").timestamp start = len(self.sortedkeys) end = 0@@ -2213,20 +2019,20 @@ # I don't know why end needs the +1, but without that
# some posts disappear # TODO figure this out... self.get_posts(start, end + 1), - tyear + tyear, ) ) writepath(fpath, r) async def render_feeds(self): m = { - 'rss': self.feedpath(settings.filenames.rss), - 'atom': self.feedpath(settings.filenames.atom), - 'json': self.feedpath(settings.filenames.json) + "rss": self.feedpath(settings.filenames.rss), + "atom": self.feedpath(settings.filenames.atom), + "json": self.feedpath(settings.filenames.json), } for ft, path in m.items(): if not self.is_uptodate(path, self.newest()): - logger.info('%s outdated, generating new', ft) + logger.info("%s outdated, generating new", ft) await self.render_feed(ft) async def render(self):@@ -2253,10 +2059,7 @@ self[post.url] = post.mtime
@property def renderfile(self): - return os.path.join( - settings.paths.get('build'), - settings.filenames.sitemap - ) + return os.path.join(settings.paths.get("build"), settings.filenames.sitemap) async def render(self): if not len(self):@@ -2265,129 +2068,105 @@
if self.mtime >= sorted(self.values())[-1]: return - sitemap = etree.Element("urlset", xmlns="http://www.sitemaps.org/schemas/sitemap/0.9") + sitemap = etree.Element( + "urlset", xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" + ) xmldoc = etree.ElementTree(sitemap) for url, mtime in self.items(): e = etree.SubElement(sitemap, "url") loc = etree.SubElement(e, "loc").text = url lastmod = etree.SubElement(e, "lastmod").text = str(arrow.get(mtime)) s = etree.tostring( - xmldoc, - encoding='utf-8', - xml_declaration=True, - pretty_print=True + xmldoc, encoding="utf-8", xml_declaration=True, pretty_print=True ) - with open(self.renderfile, 'wb') as f: + with open(self.renderfile, "wb") as f: f.write(s) async def render_txt(self): if len(self) > 0: if self.mtime >= sorted(self.values())[-1]: return - with open(self.renderfile, 'wt') as f: + with open(self.renderfile, "wt") as f: f.write("\n".join(sorted(self.keys()))) class WebmentionIO(object): def __init__(self): self.params = { - 'token': '%s' % (keys.webmentionio.get('token')), - 'since': '%s' % str(self.since), - 'domain': '%s' % (keys.webmentionio.get('domain')) + "token": "%s" % (keys.webmentionio.get("token")), + "since": "%s" % str(self.since), + "domain": "%s" % (keys.webmentionio.get("domain")), } - self.url = 'https://webmention.io/api/mentions' + self.url = "https://webmention.io/api/mentions" @property def since(self): newest = 0 - content = settings.paths.get('content') - for e in glob.glob(os.path.join(content, '*', '*', '*.md')): + content = settings.paths.get("content") + for e in glob.glob(os.path.join(content, "*", "*", "*.md")): if os.path.basename(e) == settings.filenames.md: continue # filenames are like [received epoch]-[slugified source url].md try: - mtime = int(os.path.basename(e).split('-')[0]) + mtime = int(os.path.basename(e).split("-")[0]) except Exception as exc: - logger.error( - 'int conversation failed: %s, file was: %s', - exc, - e - ) + logger.error("int conversation failed: %s, file was: %s", exc, e) continue if mtime > newest: newest = mtime return arrow.get(newest + 1) def makecomment(self, webmention): - if 'published_ts' in webmention.get('data'): - maybe = webmention.get('data').get('published') - if not maybe or maybe == 'None': - dt = arrow.get(webmention.get('verified_date')) + if "published_ts" in webmention.get("data"): + maybe = webmention.get("data").get("published") + if not maybe or maybe == "None": + dt = arrow.get(webmention.get("verified_date")) else: - dt = arrow.get(webmention.get('data').get('published')) + dt = arrow.get(webmention.get("data").get("published")) - slug = os.path.split(urlparse(webmention.get('target')).path.lstrip('/'))[0] + slug = os.path.split(urlparse(webmention.get("target")).path.lstrip("/"))[0] # ignore selfpings - if slug == settings.site.get('name'): + if slug == settings.site.get("name"): return - fdir = glob.glob( - os.path.join( - settings.paths.get('content'), - '*', - slug - ) - ) + fdir = glob.glob(os.path.join(settings.paths.get("content"), "*", slug)) if not len(fdir): - logger.error( - "couldn't find post for incoming webmention: %s", - webmention - ) + logger.error("couldn't find post for incoming webmention: %s", webmention) return elif len(fdir) > 1: - logger.error( - "multiple posts found for incoming webmention: %s", - webmention - ) + logger.error("multiple posts found for incoming webmention: %s", webmention) return fdir = fdir.pop() fpath = os.path.join( - fdir, - "%d-%s.md" % ( - dt.timestamp, - url2slug(webmention.get('source')) - ) + fdir, "%d-%s.md" % (dt.timestamp, url2slug(webmention.get("source"))) ) - author = webmention.get('data', {}).get('author', None) + author = webmention.get("data", {}).get("author", None) if not author: - logger.error('missing author info on webmention; skipping') + logger.error("missing author info on webmention; skipping") return meta = { - 'author': { - 'name': author.get('name', ''), - 'url': author.get('url', ''), - 'photo': author.get('photo', '') + "author": { + "name": author.get("name", ""), + "url": author.get("url", ""), + "photo": author.get("photo", ""), }, - 'date': str(dt), - 'source': webmention.get('source'), - 'target': webmention.get('target'), - 'type': webmention.get('activity').get('type', 'webmention') + "date": str(dt), + "source": webmention.get("source"), + "target": webmention.get("target"), + "type": webmention.get("activity").get("type", "webmention"), } try: - txt = webmention.get('data').get('content', '').strip() + txt = webmention.get("data").get("content", "").strip() except Exception as e: - txt = '' + txt = "" pass - r = "---\n%s\n---\n\n%s\n" % ( - utfyamldump(meta), - txt - ) + r = "---\n%s\n---\n\n%s\n" % (utfyamldump(meta), txt) writepath(fpath, r) def run(self):@@ -2397,45 +2176,46 @@ if webmentions.status_code != requests.codes.ok:
return try: mentions = webmentions.json() - for webmention in mentions.get('links'): + for webmention in mentions.get("links"): self.makecomment(webmention) except ValueError as e: - logger.error('failed to query webmention.io: %s', e) + logger.error("failed to query webmention.io: %s", e) pass # class GranaryIO(dict): - # granary = 'https://granary.io/url' - # convert_to = ['as2', 'mf2-json', 'jsonfeed'] +# granary = 'https://granary.io/url' +# convert_to = ['as2', 'mf2-json', 'jsonfeed'] + +# def __init__(self, source): +# self.source = source - # def __init__(self, source): - # self.source = source +# def run(self): +# for c in self.convert_to: +# p = { +# 'url': self.source, +# 'input': html, +# 'output': c +# } +# r = requests.get(self.granary, params=p) +# logger.info("queried granary.io for %s for url: %s", c, self.source) +# if r.status_code != requests.codes.ok: +# continue +# try: +# self[c] = webmentions.text +# except ValueError as e: +# logger.error('failed to query granary.io: %s', e) +# pass - # def run(self): - # for c in self.convert_to: - # p = { - # 'url': self.source, - # 'input': html, - # 'output': c - # } - # r = requests.get(self.granary, params=p) - # logger.info("queried granary.io for %s for url: %s", c, self.source) - # if r.status_code != requests.codes.ok: - # continue - # try: - # self[c] = webmentions.text - # except ValueError as e: - # logger.error('failed to query granary.io: %s', e) - # pass def dat(): for url in settings.site.sameAs: if "dat://" in url: - p = os.path.join(settings.paths.build, '.well-known') + p = os.path.join(settings.paths.build, ".well-known") if not os.path.isdir(p): os.makedirs(p) - p = os.path.join(settings.paths.build, '.well-known', 'dat') - if not os.path.exists(p) or settings.args.get('force'): + p = os.path.join(settings.paths.build, ".well-known", "dat") + if not os.path.exists(p) or settings.args.get("force"): writepath(p, "%s\nTTL=3600" % (url))@@ -2444,7 +2224,7 @@ start = int(round(time.time() * 1000))
last = 0 # this needs to be before collecting the 'content' itself - if not settings.args.get('offline') and not settings.args.get('noservices'): + if not settings.args.get("offline") and not settings.args.get("noservices"): incoming = WebmentionIO() incoming.run()@@ -2452,7 +2232,7 @@ queue = AQ()
send = [] firsttimepublished = [] - content = settings.paths.get('content') + content = settings.paths.get("content") rules = IndexPHP() micropub = MicropubPHP()@@ -2465,9 +2245,9 @@ sitemap = Sitemap()
search = Search() categories = {} frontposts = Category() - home = Home(settings.paths.get('home')) + home = Home(settings.paths.get("home")) - for e in sorted(glob.glob(os.path.join(content, '*', '*', settings.filenames.md))): + for e in sorted(glob.glob(os.path.join(content, "*", "*", settings.filenames.md))): post = Singular(e) # deal with images, if needed for i in post.images.values():@@ -2482,10 +2262,10 @@ queue.put(post.copyfiles())
# skip draft posts from anything further if post.is_future: - logger.info('%s is for the future', post.name) + logger.info("%s is for the future", post.name) continue elif not os.path.exists(post.renderfile): - logger.debug('%s seems to be fist time published', post.name) + logger.debug("%s seems to be fist time published", post.name) firsttimepublished.append(post) # add post to search database@@ -2519,10 +2299,10 @@ queue.put(search.render())
queue.put(sitemap.render()) # make gone and redirect arrays for PHP - for e in glob.glob(os.path.join(content, '*', '*.del')): + for e in glob.glob(os.path.join(content, "*", "*.del")): post = Gone(e) rules.add_gone(post.source) - for e in glob.glob(os.path.join(content, '*', '*.url')): + for e in glob.glob(os.path.join(content, "*", "*.url")): post = Redirect(e) rules.add_redirect(post.source, post.target) # render 404 fallback PHP@@ -2539,43 +2319,44 @@ # actually run all the render & copy tasks
queue.run() # copy static files - for e in glob.glob(os.path.join(content, '*.*')): - if e.endswith('.md'): + for e in glob.glob(os.path.join(content, "*.*")): + if e.endswith(".md"): continue - t = os.path.join(settings.paths.get('build'), os.path.basename(e)) + t = os.path.join(settings.paths.get("build"), os.path.basename(e)) if os.path.exists(t) and mtime(e) <= mtime(t): continue cp(e, t) end = int(round(time.time() * 1000)) - logger.info('process took %d ms' % (end - start)) + logger.info("process took %d ms" % (end - start)) - if not settings.args.get('offline'): + if not settings.args.get("offline"): # upload site try: - logger.info('starting syncing') + logger.info("starting syncing") os.system( - "rsync -avuhH --delete-after %s/ %s/" % ( - settings.paths.get('build'), - '%s/%s' % (settings.syncserver, - settings.paths.get('remotewww')) + "rsync -avuhH --delete-after %s/ %s/" + % ( + settings.paths.get("build"), + "%s/%s" % (settings.syncserver, settings.paths.get("remotewww")), ) ) - logger.info('syncing finished') + logger.info("syncing finished") except Exception as e: - logger.error('syncing failed: %s', e) + logger.error("syncing failed: %s", e) - if not settings.args.get('offline') and not settings.args.get('noservices'): - logger.info('sending webmentions') + if not settings.args.get("offline") and not settings.args.get("noservices"): + logger.info("sending webmentions") for wm in send: queue.put(wm.send()) queue.run() - logger.info('sending webmentions finished') + logger.info("sending webmentions finished") for post in firsttimepublished: queue.put(post.save_memento()) queue.put(post.save_to_archiveorg()) queue.run() -if __name__ == '__main__': + +if __name__ == "__main__": make()
@@ -11,10 +11,11 @@ import hashlib
import os import settings + class Pandoc(str): - in_format = 'html' + in_format = "html" in_options = [] - out_format = 'plain' + out_format = "plain" out_options = [] columns = None@@ -25,18 +26,14 @@
@property def cachefile(self): return os.path.join( - settings.tmpdir, - "%s_%s.pandoc" % ( - self.__class__.__name__, - self.hash - ) + settings.tmpdir, "%s_%s.pandoc" % (self.__class__.__name__, self.hash) ) @property def cache(self): if not os.path.exists(self.cachefile): return False - with open(self.cachefile, 'rt') as f: + with open(self.cachefile, "rt") as f: self.result = f.read() return True@@ -44,38 +41,26 @@ def __init__(self, text):
self.source = text if self.cache: return - conv_to = '--to=%s' % (self.out_format) - if (len(self.out_options)): - conv_to = '%s+%s' % ( - conv_to, - '+'.join(self.out_options) - ) + conv_to = "--to=%s" % (self.out_format) + if len(self.out_options): + conv_to = "%s+%s" % (conv_to, "+".join(self.out_options)) + + conv_from = "--from=%s" % (self.in_format) + if len(self.in_options): + conv_from = "%s+%s" % (conv_from, "+".join(self.in_options)) - conv_from = '--from=%s' % (self.in_format) - if (len(self.in_options)): - conv_from = '%s+%s' % ( - conv_from, - '+'.join(self.in_options) - ) - is_pandoc_version2 = False try: - version = subprocess.check_output(['pandoc', '-v']) - if version.startswith(b'pandoc 2'): + version = subprocess.check_output(["pandoc", "-v"]) + if version.startswith(b"pandoc 2"): is_pandoc_version2 = True except OSError: print("Error: pandoc is not installed!") - - cmd = [ - 'pandoc', - '-o-', - conv_to, - conv_from, - '--no-highlight' - ] + + cmd = ["pandoc", "-o-", conv_to, conv_from, "--no-highlight"] if is_pandoc_version2: # Only pandoc v2 and higher support quiet param - cmd.append('--quiet') + cmd.append("--quiet") if self.columns: cmd.append(self.columns)@@ -89,14 +74,10 @@ )
stdout, stderr = p.communicate(input=text.encode()) if stderr: - logging.warning( - "Error during pandoc covert:\n\t%s\n\t%s", - cmd, - stderr - ) - r = stdout.decode('utf-8').strip() + logging.warning("Error during pandoc covert:\n\t%s\n\t%s", cmd, stderr) + r = stdout.decode("utf-8").strip() self.result = r - with open(self.cachefile, 'wt') as f: + with open(self.cachefile, "wt") as f: f.write(self.result) def __str__(self):@@ -107,65 +88,65 @@ return str(self.result)
class PandocMD2HTML(Pandoc): - in_format = 'markdown' + in_format = "markdown" in_options = [ - 'footnotes', - 'pipe_tables', - 'strikeout', + "footnotes", + "pipe_tables", + "strikeout", # 'superscript', # 'subscript', - 'raw_html', - 'definition_lists', - 'backtick_code_blocks', - 'fenced_code_attributes', - 'shortcut_reference_links', - 'lists_without_preceding_blankline', - 'autolink_bare_uris', + "raw_html", + "definition_lists", + "backtick_code_blocks", + "fenced_code_attributes", + "shortcut_reference_links", + "lists_without_preceding_blankline", + "autolink_bare_uris", ] - out_format = 'html5' + out_format = "html5" out_options = [] class PandocHTML2MD(Pandoc): - in_format = 'html' + in_format = "html" in_options = [] - out_format = 'markdown' + out_format = "markdown" out_options = [ - 'footnotes', - 'pipe_tables', - 'strikeout', - 'raw_html', - 'definition_lists', - 'backtick_code_blocks', - 'fenced_code_attributes', - 'shortcut_reference_links', - 'lists_without_preceding_blankline', - 'autolink_bare_uris', + "footnotes", + "pipe_tables", + "strikeout", + "raw_html", + "definition_lists", + "backtick_code_blocks", + "fenced_code_attributes", + "shortcut_reference_links", + "lists_without_preceding_blankline", + "autolink_bare_uris", ] class PandocMD2TXT(Pandoc): - in_format = 'markdown' + in_format = "markdown" in_options = [ - 'footnotes', - 'pipe_tables', - 'strikeout', - 'raw_html', - 'definition_lists', - 'backtick_code_blocks', - 'fenced_code_attributes', - 'shortcut_reference_links', - 'lists_without_preceding_blankline', - 'autolink_bare_uris', + "footnotes", + "pipe_tables", + "strikeout", + "raw_html", + "definition_lists", + "backtick_code_blocks", + "fenced_code_attributes", + "shortcut_reference_links", + "lists_without_preceding_blankline", + "autolink_bare_uris", ] - out_format = 'plain' + out_format = "plain" out_options = [] - columns = '--columns=80' + columns = "--columns=80" class PandocHTML2TXT(Pandoc): - in_format = 'html' + in_format = "html" in_options = [] - out_format = 'plain' + out_format = "plain" out_options = [] - columns = '--columns=80' + columns = "--columns=80"
@@ -17,231 +17,220 @@ __setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__ -base = os.path.abspath(os.path.expanduser('~/Projects/petermolnar.net')) -syncserver = 'liveserver:/web/petermolnar.net' +base = os.path.abspath(os.path.expanduser("~/Projects/petermolnar.net")) +syncserver = "liveserver:/web/petermolnar.net" pagination = 42 -notinfeed = ['note'] -flat = ['article', 'journal'] -displaydate = 'YYYY-MM-DD HH:mm' +notinfeed = ["note"] +flat = ["article", "journal"] +displaydate = "YYYY-MM-DD HH:mm" mementostartime = 1561192582 -licence = struct({ - 'article': 'CC-BY-4.0', - 'journal': 'CC-BY-NC-4.0', - '_default': 'CC-BY-NC-ND-4.0' -}) - -author = struct({ - "@context": "http://schema.org", - "@type": "Person", - "image": "https://petermolnar.net/favicon.jpg", - "email": "mail@petermolnar.net", - "url": "https://petermolnar.net/", - "name": "Peter Molnar" -}) +licence = struct( + {"article": "CC-BY-4.0", "journal": "CC-BY-NC-4.0", "_default": "CC-BY-NC-ND-4.0"} +) -site = struct({ - "@context": "http://schema.org", - "@type": "WebSite", - "headline": "Peter Molnar", - "url": "https://petermolnar.net", - "name": "petermolnar.net", - "image": "https://petermolnar.net/favicon.ico", - "license": "https://spdx.org/licenses/%s.html" % (licence['_default']), - "sameAs": [ - ], - "author": { +author = struct( + { "@context": "http://schema.org", "@type": "Person", "image": "https://petermolnar.net/favicon.jpg", "email": "mail@petermolnar.net", "url": "https://petermolnar.net/", "name": "Peter Molnar", - "sameAs": [ - "https://github.com/petermolnar", - "https://petermolnar.net/cv.html", - "xmpp:mail@petermolnar.net", - "https://wa.me/447592011721", - "https://t.me/petermolnar", - "https://twitter.com/petermolnar" - ], - "follows": "https://petermolnar.net/following.opml" - }, - "publisher": { + } +) + +site = struct( + { "@context": "http://schema.org", - "@type": "Organization", - "logo": { - "@context": "http://schema.org", - "@type": "ImageObject", - "url": "https://petermolnar.net/favicon.jpg" - }, - "url": "https://petermolnar.net/", + "@type": "WebSite", + "headline": "Peter Molnar", + "url": "https://petermolnar.net", "name": "petermolnar.net", - "email": "webmaster@petermolnar.net" - }, - "potentialAction": [ - { + "image": "https://petermolnar.net/favicon.ico", + "license": "https://spdx.org/licenses/%s.html" % (licence["_default"]), + "sameAs": [], + "author": { "@context": "http://schema.org", - "@type": "SearchAction", - "target": "https://petermolnar.net/search.php?q={q}", - "query-input": "required name=q", - "url": "https://petermolnar.net/search.php" + "@type": "Person", + "image": "https://petermolnar.net/favicon.jpg", + "email": "mail@petermolnar.net", + "url": "https://petermolnar.net/", + "name": "Peter Molnar", + "sameAs": [ + "https://github.com/petermolnar", + "https://petermolnar.net/cv.html", + "xmpp:mail@petermolnar.net", + "https://wa.me/447592011721", + "https://t.me/petermolnar", + "https://twitter.com/petermolnar", + ], + "follows": "https://petermolnar.net/following.opml", }, - { + "publisher": { "@context": "http://schema.org", - "@type": "FollowAction", - "url": "https://petermolnar.net/follow/", - "name": "follow" + "@type": "Organization", + "logo": { + "@context": "http://schema.org", + "@type": "ImageObject", + "url": "https://petermolnar.net/favicon.jpg", + }, + "url": "https://petermolnar.net/", + "name": "petermolnar.net", + "email": "webmaster@petermolnar.net", }, - { - "@context": "http://schema.org", - "@type": "DonateAction", - "description": "Monzo", - "name": "monzo", - "url": "https://monzo.me/petermolnar/", - "recipient": author - }, - { - "@context": "http://schema.org", - "@type": "DonateAction", - "description": "Paypal", - "name": "paypal", - "url": "https://paypal.me/petermolnar/", - "recipient": author - } - ] -}) + "potentialAction": [ + { + "@context": "http://schema.org", + "@type": "SearchAction", + "target": "https://petermolnar.net/search.php?q={q}", + "query-input": "required name=q", + "url": "https://petermolnar.net/search.php", + }, + { + "@context": "http://schema.org", + "@type": "FollowAction", + "url": "https://petermolnar.net/follow/", + "name": "follow", + }, + { + "@context": "http://schema.org", + "@type": "DonateAction", + "description": "Monzo", + "name": "monzo", + "url": "https://monzo.me/petermolnar/", + "recipient": author, + }, + { + "@context": "http://schema.org", + "@type": "DonateAction", + "description": "Paypal", + "name": "paypal", + "url": "https://paypal.me/petermolnar/", + "recipient": author, + }, + ], + } +) -menu = { - 'home': { - 'url': '%s/' % site['url'], - 'text': 'home', - }, - 'photo': { - 'url': '%s/category/photo/' % site['url'], - 'text': 'photos', - }, - 'journal': { - 'url': '%s/category/journal/' % site['url'], - 'text': 'journal', - }, - 'article': { - 'url': '%s/category/article/' % site['url'], - 'text': 'IT', - }, - 'note': { - 'url': '%s/category/note/' % site['url'], - 'text': 'notes' +menu = stuct( + { + "home": {"url": "%s/" % site["url"], "text": "home"}, + "photo": {"url": "%s/category/photo/" % site["url"], "text": "photos"}, + "journal": {"url": "%s/category/journal/" % site["url"], "text": "journal"}, + "article": {"url": "%s/category/article/" % site["url"], "text": "IT"}, + "note": {"url": "%s/category/note/" % site["url"], "text": "notes"}, } -} +) -meta = struct({ - 'webmention': 'https://webmention.io/petermolnar.net/webmention', - 'pingback': 'https://webmention.io/petermolnar.net/xmlrpc', - 'hub': 'https://petermolnar.superfeedr.com/', - 'authorization_endpoint': 'https://indieauth.com/auth', - 'token_endpoint': 'https://tokens.indieauth.com/token', - 'micropub': 'https://petermolnar.net/micropub.php', - #'microsub': 'https://aperture.p3k.io/microsub/83' -}) +meta = struct( + { + "webmention": "https://webmention.io/petermolnar.net/webmention", + "pingback": "https://webmention.io/petermolnar.net/xmlrpc", + "hub": "https://petermolnar.superfeedr.com/", + "authorization_endpoint": "https://indieauth.com/auth", + "token_endpoint": "https://tokens.indieauth.com/token", + "micropub": "https://petermolnar.net/micropub.php", + #'microsub': 'https://aperture.p3k.io/microsub/83' + } +) -paths = struct({ - 'content': os.path.join(base, 'content'), - 'tmpl': os.path.join(base, 'nasg', 'templates'), - 'watermark': os.path.join(base, 'nasg', 'templates', 'watermark.png'), - 'build': os.path.join(base, 'www'), - 'queue': os.path.join(base, 'queue'), - 'remotewww': 'web', - 'remotequeue': 'queue', - 'micropub': os.path.join(base, 'content', 'note'), - 'home': os.path.join(base, 'content', 'home', 'index.md'), - 'category': 'category', - 'feed': 'feed' -}) +paths = struct( + { + "content": os.path.join(base, "content"), + "tmpl": os.path.join(base, "nasg", "templates"), + "watermark": os.path.join(base, "nasg", "templates", "watermark.png"), + "build": os.path.join(base, "www"), + "queue": os.path.join(base, "queue"), + "remotewww": "web", + "remotequeue": "queue", + "micropub": os.path.join(base, "content", "note"), + "home": os.path.join(base, "content", "home", "index.md"), + "category": "category", + "feed": "feed", + } +) -filenames = struct({ - 'rss': 'index.xml', - 'atom': 'atom.xml', - 'json': 'index.json', - 'md': 'index.md', - 'txt': 'index.txt', - 'html': 'index.html', - 'gopher': 'gophermap', - 'oembed_xml': 'oembed.xml', - 'oembed_json': 'oembed.json', - 'memento': 'memento.html', - 'sitemap': 'sitemap.xml' -}) +filenames = struct( + { + "rss": "index.xml", + "atom": "atom.xml", + "json": "index.json", + "md": "index.md", + "txt": "index.txt", + "html": "index.html", + "gopher": "gophermap", + "oembed_xml": "oembed.xml", + "oembed_json": "oembed.json", + "memento": "memento.html", + "sitemap": "sitemap.xml", + } +) -datignore = [ - '.git', - '.dat', - '**.php' -] +datignore = [".git", ".dat", "**.php"] -photo = struct({ - 're_author': re.compile(r'(?:P[eé]ter Moln[aá]r)|(?:Moln[aá]r P[eé]ter)|(?:petermolnar\.(?:eu|net))'), - 'default': 720, - 'sizes': { - #90 = s - #360 = m - 720: '', - 1280: '_b', - }, - 'earlyyears': 2014 -}) +photo = struct( + { + "re_author": re.compile( + r"(?:P[eé]ter Moln[aá]r)|(?:Moln[aá]r P[eé]ter)|(?:petermolnar\.(?:eu|net))" + ), + "default": 720, + "sizes": { + # 90 = s + # 360 = m + 720: "", + 1280: "_b", + }, + "earlyyears": 2014, + } +) -#symlinks = { - #'files/a-view-from-barbican-1280x720.jpg': 'a-view-from-barbican/a-view-from-barbican_b.jpg', - #'files/hills_from_beachy_head-540x226.jpg': 'hills-from-beachy-head/hills-from-beachy-head.jpg', - #'files/seven_sisters_from_beachy_head-540x304.jpg': 'seven-sisters-from-beachy-head/seven-sisters-from-beachy-head.jpg', - #'files/the_countryside-540x304.jpg': 'the-countryside/the-countryside.jpg', - #'files/MGP0538-540x358.jpg': '', - #'files/IMGP0539-540x358.jpg': '', - #'files/IMGP0538-540x358.jpg': '', -#} +# symlinks = { +#'files/a-view-from-barbican-1280x720.jpg': 'a-view-from-barbican/a-view-from-barbican_b.jpg', +#'files/hills_from_beachy_head-540x226.jpg': 'hills-from-beachy-head/hills-from-beachy-head.jpg', +#'files/seven_sisters_from_beachy_head-540x304.jpg': 'seven-sisters-from-beachy-head/seven-sisters-from-beachy-head.jpg', +#'files/the_countryside-540x304.jpg': 'the-countryside/the-countryside.jpg', +#'files/MGP0538-540x358.jpg': '', +#'files/IMGP0539-540x358.jpg': '', +#'files/IMGP0538-540x358.jpg': '', +# } -tmpdir = os.path.join(gettempdir(),'nasg') +tmpdir = os.path.join(gettempdir(), "nasg") if not os.path.isdir(tmpdir): os.makedirs(tmpdir) -_parser = argparse.ArgumentParser(description='Parameters for NASG') +_parser = argparse.ArgumentParser(description="Parameters for NASG") _booleanparams = { - 'regenerate': 'force (re)downsizing images', - 'force': 'force (re)rendering HTML', - 'debug': 'set logging to debug level', - 'quiet': 'show only errors', - 'offline': 'offline mode - no syncing, no querying services, etc.', - 'noping': 'make dummy webmention entries and don\'t really send them', - 'noservices': 'skip querying any service but do sync the website', - 'memento': 'try to fetch mementos from archive.org' + "regenerate": "force (re)downsizing images", + "force": "force (re)rendering HTML", + "debug": "set logging to debug level", + "quiet": "show only errors", + "offline": "offline mode - no syncing, no querying services, etc.", + "noping": "make dummy webmention entries and don't really send them", + "noservices": "skip querying any service but do sync the website", + "memento": "try to fetch mementos from archive.org", } for k, v in _booleanparams.items(): - _parser.add_argument( - '--%s' % (k), - action='store_true', - default=False, - help=v - ) + _parser.add_argument("--%s" % (k), action="store_true", default=False, help=v) args = vars(_parser.parse_args()) -if args.get('debug', False): +if args.get("debug", False): loglevel = 10 -elif args.get('quiet', False): +elif args.get("quiet", False): loglevel = 40 else: loglevel = 20 -logger = logging.getLogger('NASG') +logger = logging.getLogger("NASG") logger.setLevel(loglevel) console_handler = logging.StreamHandler() -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") console_handler.setFormatter(formatter) logger.addHandler(console_handler) -logging.getLogger('asyncio').setLevel(loglevel) +logging.getLogger("asyncio").setLevel(loglevel)