import os
import re
import sys
import collections
import logging
import glob
import img
import pypandoc
import langdetect
from cache import Cached
from slugify import slugify
from ruamel import yaml
from bs4 import BeautifulSoup
import frontmatter
from webmentiondb import WebmentionDB
import arrow
import json
import socket
import requests
import hashlib
import shutil
class SingularHandler(object):
def __init__(self, fpath, pingdb=WebmentionDB(), category='note'):
self.fpath = os.path.abspath(fpath)
path, fname = os.path.split(self.fpath)
fname, ext = os.path.splitext(fname)
self.fname = fname
self.fext = ext
self.ftime = os.stat(self.fpath)
self.target = os.path.join(glob.TARGET, "%s.html" % (self.fname))
basedir = os.path.join(glob.TARGET, "%s" % (self.fname))
if not os.path.isdir(basedir):
os.mkdir(basedir)
self.saved = os.path.join(glob.TARGET, "%s" % (self.fname), "saved.html")
self.pingdb = pingdb
self.title = ''
self.content = ''
self._content = ''
self.summary = ''
self.html = ''
self.sumhtml = ''
self.category = category
self.tags = []
self.reactions = {}
#self.date = datetime.datetime(1970, 1, 1).replace(tzinfo=pytz.utc)
self.date = arrow.get(0)
self.updated = None
self.dtime = 0
self.utime = 0
self.redirect = {}
self.exifmin = {}
self.lang = glob.conf['site']['lang']
self.syndicate = {}
self.syndications = []
self.template = 'singular.html'
self.slug = slugify(self.fname, only_ascii=True, lower=True)
self.shortslug = slugify(self.fname, only_ascii=True, lower=True)
self.img = None
self.srcset = ''
def __repr__(self):
return "Post '%s' (%s), category: %s" % (self.title,self.fname,self.category)
def _postsetup(self):
""" Shared post-setup - the initial thing, such at title, should be
set by the classes inheriting this one; these are only the common,
shared variables """
# set published epoch
#self.dtime = calendar.timegm(self.date.timetuple())
self.dtime = self.date.timestamp
# set updated epoch, if any and set the original file date according
# to either the updated or the published time
if self.updated:
#self.utime = calendar.timegm(self.updated.timetuple())
self.utime = self.updated.timestamp
if self.utime > 0 and self.utime != self.ftime.st_mtime:
os.utime(self.fpath, (self.utime, self.utime))
elif self.dtime > 0 and self.dtime != self.ftime.st_mtime:
os.utime(self.fpath, (self.dtime, self.dtime))
# generate shortslug from dtime if possible
if self.dtime > 0:
self.shortslug = SingularHandler.baseN(self.dtime)
self.redirect[self.shortslug] = 1
# detect post content language if possible
try:
self.lang = langdetect.detect("%s\n\n%s" % (self.title, self.content))
except:
pass
# make HTML from markdown via pandoc for the content and the summary
self.html = SingularHandler.pandoc_md2html(
self.content,
time=self.ftime
)
self.sumhtml = SingularHandler.pandoc_md2html(
self.summary,
time=self.ftime
)
self.url = "%s/%s" % (glob.conf['site']['url'], self.slug)
self.syndications = self.pingdb.posses(self.url)
#def urlsvg(self):
# import pyqrcode
# import tempfile
## generate qr code to the url
#qrname = tempfile.NamedTemporaryFile(prefix='pyqr_')
#qr = pyqrcode.create(self.url, error='L')
#qr.svg(
#qrname.name,
#xmldecl=False,
#omithw=True,
#scale=1,
#quiet_zone=0,
#svgclass='qr',
#lineclass='qrline'
#)
#with open(qrname.name) as f:
#qrsvg = f.read()
#f.close()
#return qrsvg
@staticmethod
def pandoc_md2html(t, time=None):
if len(t) == 0:
return t
cached = Cached(text="%s" % t, stime=time)
c = cached.get()
if c:
return c
else:
extras = [
'backtick_code_blocks',
'auto_identifiers',
'fenced_code_attributes',
'definition_lists',
'grid_tables',
'pipe_tables',
'strikeout',
'superscript',
'subscript',
'markdown_in_html_blocks',
'shortcut_reference_links',
'autolink_bare_uris',
'raw_html',
'link_attributes',
'header_attributes',
'footnotes',
]
md = "markdown+" + "+".join(extras)
t = pypandoc.convert_text(t, to='html5', format=md)
cached.set(t)
return t
@staticmethod
def pandoc_html2md(t, time=None):
if len(t) == 0:
return t
cached = Cached(text="%s" % t, stime=time)
c = cached.get()
if c:
return c
else:
t = pypandoc.convert_text(
t,
to="markdown-" + "-".join([
'raw_html',
'native_divs',
'native_spans',
]),
format='html'
)
cached.set(t)
return t
def tmpl(self):
return {
'title': self.title,
'published': self.date,
'tags': self.tags,
'author': glob.conf['author'],
'content': self.content,
'html': self.html,
'category': self.category,
'reactions': self.reactions,
'updated': self.updated,
'summary': self.sumhtml,
'exif': self.exifmin,
'lang': self.lang,
'syndicate': self.syndicate,
'slug': self.slug,
'shortslug': self.shortslug,
'srcset': self.srcset,
}
@staticmethod
def write_redirect(sslug, target, tstamp=arrow.utcnow().timestamp):
tmpl = glob.jinja2env.get_template('redirect.html')
jvars = {
'url': target
}
r = tmpl.render(jvars)
# this is to support / ending urls even for the redirects
dirs = [
os.path.join(glob.TARGET, sslug)
]
for d in dirs:
if not os.path.exists(d):
os.mkdir(d)
files = [
os.path.join(glob.TARGET, "%s.html" % (sslug)),
os.path.join(glob.TARGET, sslug, "index.html")
]
for f in files:
if os.path.isfile(f):
rtime = os.stat(f)
if tstamp == rtime.st_mtime:
logging.debug(
"Unchanged dates on redirect file %s", f
)
continue
with open(f, "w") as html:
logging.info("writing redirect file %s", f)
html.write(r)
html.close()
os.utime(f, (tstamp,tstamp))
def redirects(self):
""" Write redirect HTMLs """
if self.category == 'page':
return
for sslug in self.redirect.keys():
SingularHandler.write_redirect(sslug, self.url, self.ftime.st_mtime)
def write(self):
""" Write HTML file """
if os.path.isfile(self.target):
ttime = os.stat(self.target)
if self.ftime.st_mtime == ttime.st_mtime and not glob.FORCEWRITE:
logging.debug(
"Unchanged dates on %s; skipping rendering and writing",
self.fname
)
return
tmpl = glob.jinja2env.get_template(self.template)
logging.info("rendering %s", self.fname)
tmplvars = {
'post': self.tmpl(),
'site': glob.conf['site'],
'taxonomy': {},
}
r = tmpl.render(tmplvars)
soup = BeautifulSoup(r,"html5lib")
r = soup.prettify()
targets = [self.target]
for target in targets:
with open(target, "w") as html:
logging.info("writing %s", target)
html.write(r)
html.close()
os.utime(target, (self.ftime.st_mtime, self.ftime.st_mtime))
rdir = os.path.join(glob.TARGET, self.slug)
if not os.path.isdir(rdir):
os.mkdir(rdir)
altdst = os.path.join(glob.TARGET, self.slug, 'index.html')
altsrc = os.path.join('..', self.target)
if not os.path.islink(altdst):
if os.path.isfile(altdst):
os.unlink(altdst)
os.symlink(altsrc, altdst)
#links = []
#for r in self.reactions.items():
#reactiontype, urls = r
#if isinstance(urls, str):
#links.append(urls)
#elif isinstance(urls, list):
#links = [*links, *urls]
#if 1 == len(links):
#saved = os.path.join(glob.TARGET, self.slug, 'saved.html')
#if not os.path.isfile(saved):
#h, p = _localcopy_hashpath(links[0])
#c = self._get_localcopy(links[0], h, p)
#with open(saved, 'w') as f:
#f.write(c)
#f.close()
def index(self, ix):
""" Write search index """
writer = ix.writer()
c = "%s %s %s %s %s" % (
self.slug,
self.summary,
self._content,
yaml.dump(self.reactions, Dumper=yaml.RoundTripDumper),
yaml.dump(self.exifmin, Dumper=yaml.RoundTripDumper)
)
c = "%s %s" % (c, self._localcopy_include())
if self.img:
imgstr = self.img.mksrcset(generate_caption=False)
else:
imgstr = ''
writer.add_document(
title=self.title,
url=self.url,
content=c,
date=self.date.datetime,
tags=",".join(self.tags),
weight=1,
img=imgstr
)
writer.commit()
def pings(self):
""" Ping (webmention) all URLs found in the post """
links = []
urlregex = re.compile(
r'\s+https?\:\/\/?[a-zA-Z0-9\.\/\?\:@\-_=#]+'
r'\.[a-zA-Z0-9\.\/\?\:@\-_=#]*'
)
matches = re.findall(urlregex, self.content)
for r in self.reactions.items():
reactiontype, urls = r
if isinstance(urls, str):
matches.append(urls)
elif isinstance(urls, list):
matches = [*matches, *urls]
#for s in self.syndicate.keys():
#matches.append('https://brid.gy/publish/%s' % (s))
if self.utime and self.utime > 0:
time = self.utime
else:
time = self.dtime
if len(matches) > 0:
for link in matches:
if glob.conf['site']['domain'] in link:
continue
if link in links:
continue
#self._localcopy(link)
self.pingdb.ping(self.url, link, time)
links.append(link)
def _localcopy_hashpath(self,url):
h = hashlib.md5(url.encode('utf-8')).hexdigest()
p = os.path.join(glob.LOCALCOPIES, "%s.html" % (h))
return (h, p)
def _localcopy_include(self):
links = []
md = ''
for r in self.reactions.items():
reactiontype, urls = r
if isinstance(urls, str):
links.append(urls)
elif isinstance(urls, list):
links = [*links, *urls]
for url in links:
h, p = self._localcopy_hashpath(url)
html = self._get_localcopy(url, h, p)
md = "%s %s" % (
md,
SingularHandler.pandoc_html2md(html, os.stat(p))
)
return md
def _get_localcopy(self, url, h, p):
html = ''
if os.path.isfile(p):
with open(p, 'r') as f:
html = f.read()
f.close()
else:
html = self._make_localcopy(url, h, p)
return html
def _make_localcopy(self, url, h, p):
post = self._pull_localcopy(url)
tmpl = glob.jinja2env.get_template('localcopy.html')
html = tmpl.render({'post': post})
soup = BeautifulSoup(html,"html5lib")
html = soup.prettify()
with open(p, "w") as f:
logging.info("saving readable copy of %s to %s", url, p)
f.write(html)
f.close()
return html
def _pull_localcopy(self, url):
# find the true URL
# MAYBE: add fallback to archive.org?
realurl = url
try:
pretest = requests.head(url, allow_redirects=True, timeout=30)
realurl = pretest.url
except:
pass
parsed = {
'lang': 'en',
'url': url,
'realurl': realurl,
'html': '',
'title': '',
'excerpt': '',
'byline': '',
}
if 'readable' in glob.conf and \
'port' not in glob.conf['readable'] and \
'host' not in glob.conf['readable']:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socktest = sock.connect_ex((
glob.conf['readable']['host'], int(glob.conf['readable']['port'])
))
if 0 == socktest:
text = self._localcopy_via_proxy(realurl)
parsed['html'] = text.get('content','')
parsed['title'] = text.get('title',url)
parsed['excerpt'] = text.get('excerpt', '')
parsed['byline'] = text.get('byline', '')
try:
parsed['lang'] = langdetect.detect(parsed['html'])
except:
pass
return parsed
# TODO: fallback to full-python solution if the previous failed
return parsed
def _localcopy_via_proxy(self, url):
r = "http://%s:%s/api/get?url=%s&sanitize=y" % (
glob.conf['readable']['host'],
glob.conf['readable']['port'],
url
)
try:
req = requests.get(r,allow_redirects=False,timeout=60);
except:
return None
text = {}
try:
text = json.loads(req.text)
except:
pass
return text
def _adaptify(self):
""" Generate srcset for all images possible """
linkto = False
isrepost = None
if len(self.reactions.keys()):
isrepost = list(self.reactions.keys())[0]
if isrepost:
if len(self.reactions[isrepost]) == 1:
linkto = self.reactions[isrepost][0]
mdmatch = re.compile(
r'!\[.*\]\(.*?\.(?:jpe?g|png|gif)'
r'(?:\s+[\'\"]?.*?[\'\"]?)?\)(?:\{.*?\})?'
)
mdsplit = re.compile(
r'!\[(.*)\]\((?:\/(?:files|cache)'
r'(?:\/[0-9]{4}\/[0-9]{2})?\/(.*\.(?:jpe?g|png|gif)))'
r'(?:\s+[\'\"]?(.*?)[\'\"]?)?\)(?:\{(.*?)\})?'
)
mdimg = re.findall(mdmatch, self.content)
for i in mdimg:
m = re.match(mdsplit, i)
if m:
#logging.info(m.groups())
imgpath = os.path.join(glob.SFILES, m.group(2))
if not os.path.isfile(imgpath):
for c in glob.conf['category'].items():
catn, catd = c
catp = os.path.abspath(os.path.join(glob.CONTENT, catn))
if not os.path.exists(catp) \
or not 'type' in catd \
or catd['type'] != 'photo':
continue
imgpath = os.path.join(catp, m.group(2))
break
if os.path.isfile(imgpath):
t = ''
if m.group(3):
t = m.group(3)
cl = ''
if m.group(4):
cl = m.group(4)
a = ''
if m.group(1):
a = m.group(1)
im = img.ImageHandler(
imgpath,
alttext=a,
title=t,
imgcl=cl,
linkto=linkto
)
im.downsize()
logging.debug("replacing image %s with srcset", imgpath)
srcset = im.mksrcset()
if srcset:
self.content = self.content.replace(i, srcset)
del(im)
else:
logging.error("%s missing %s", m.group(2), self.fpath)
def _video(self):
""" [video] shortcode extractor """
match = re.compile(r'\[video mp4=\"/(?:files|cache).*?\"\]\[/video\]')
split = re.compile(r'\[video mp4=\"(/(?:files|cache)\/(.*?))\"\]\[/video\]')
videos = re.findall(match, self.content)
for vid in videos:
v = re.match(split, vid)
video = """
""" % (v.group(1))
self.content = self.content.replace(vid, video)
#def _files(self):
#""" Copy misc files referenced """
#match = re.compile(
#r'\s(?:%s)?/(?:files|cache)'
#r'/.*\.(?:(?!jpe?g|png|gif).*)\s' % (glob.conf['site']['domain'])
#)
#split = re.compile(
#r'\s(?:%s)?/((?:files|cache)'
#r'/(.*\.(?:(?!jpe?g|png|gif).*)))\s' % (glob.conf['site']['domain'])
#)
##files = re.findall(match, self.content)
##print(files)
def _snippets(self):
""" Replaces [git:(repo)/(file.ext)] with corresponding code snippet """
snmatch = re.compile(r'\[git:[^\/]+\/(?:.*\..*)\]')
snsplit = re.compile(r'\[git:([^\/]+)\/((?:.*)\.(.*))\]')
snippets = re.findall(snmatch, self.content)
isconf = re.compile(r'conf', re.IGNORECASE)
for snippet in snippets:
sn = re.match(snsplit, snippet)
if sn:
fpath = os.path.join(glob.SOURCE, sn.group(1), sn.group(2))
if not os.path.isfile(fpath):
logging.error(
"missing blogsnippet in %s: %s",
self.fpath,
fpath
)
continue
if re.match(isconf, sn.group(3)):
lang = 'apache'
else:
lang = sn.group(3)
with open(fpath, "r") as snip:
c = snip.read()
snip.close
c = "\n\n```%s\n%s\n```\n" % (lang, c)
logging.debug("replacing blogsnippet %s", fpath)
self.content = self.content.replace(snippet, c)
@staticmethod
def baseN(num, b=36, numerals="0123456789abcdefghijklmnopqrstuvwxyz"):
""" Used to create short, lowecase slug for a number (an epoch) passed """
num = int(num)
return ((num == 0) and numerals[0]) or (
SingularHandler.baseN(
num // b,
b,
numerals
).lstrip(numerals[0]) + numerals[num % b]
)
class ArticleHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(ArticleHandler, self).__init__(*args, **kwargs)
self.dctype = 'Text'
self._setup()
def _setup(self):
post = frontmatter.load(self.fpath)
self.meta = post.metadata
self.content = post.content
self._content = '%s' % (self.content)
if 'tags' in post.metadata:
self.tags = post.metadata['tags']
if 'title' in post.metadata:
self.title = post.metadata['title']
if 'published' in post.metadata:
self.date = arrow.get(post.metadata['published'])
if 'updated' in post.metadata:
self.updated = arrow.get(post.metadata['updated'])
if 'summary' in post.metadata:
self.summary = post.metadata['summary']
if 'redirect' in post.metadata and \
isinstance(post.metadata['redirect'], list):
for r in post.metadata['redirect']:
self.redirect[r] = 1
if 'syndicate' in post.metadata:
z = post.metadata['syndicate']
if isinstance(z, str):
self.syndicate[z] = ''
elif isinstance(z, dict):
for s, c in z.items():
self.syndicate[s] = c
elif isinstance(z, list):
for s in z:
self.syndicate[s] = ''
self.reactions = {}
# getting rid of '-' to avoid css trouble and similar
rmap = {
'bookmark-of': 'bookmark',
'repost-of': 'repost',
'in-reply-to': 'reply',
}
for x in rmap.items():
key, replace = x
if key in self.meta:
if isinstance(self.meta[key], str):
self.reactions[replace] = [self.meta[key]]
elif isinstance(self.meta[key], list):
self.reactions[replace] = self.meta[key]
self._adaptify()
self._snippets()
self._video()
#self._files()
super(ArticleHandler, self)._postsetup()
class PhotoHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(PhotoHandler, self).__init__(*args, **kwargs)
self.dctype = 'Image'
self.img = img.ImageHandler(self.fpath)
self.exif = self.img.exif
self._setup()
def _setup(self):
self.syndicate = {
'flickr': '',
}
keywords = [
'XMP:Keywords',
'IPTC:Keywords'
]
tags = {}
for key in keywords:
if key in self.exif and self.exif[key]:
if isinstance(self.exif[key], str):
self.exif[key] = self.exif[key].split(",")
if isinstance(self.exif[key], list):
for tag in self.exif[key]:
tags[str(tag).strip()] = 1
self.tags = list(tags.keys())
# content
keywords = [
'XMP:Description',
'IPTC:Caption-Abstract'
]
for key in keywords:
if key in self.exif and self.exif[key]:
self.content = self.exif[key]
break
self._content = '%s' % (self.content)
# title
keywords = [
'XMP:Title',
'XMP:Headline',
'IPTC:Headline'
]
for key in keywords:
if key in self.exif and self.exif[key]:
self.title = self.exif[key]
break
# datetime
keywords = [
'XMP:DateTimeDigitized',
'XMP:CreateDate',
'EXIF:CreateDate',
'EXIF:ModifyDate'
]
pattern = re.compile(
"(?P[0-9]{4}):(?P[0-9]{2}):(?P[0-9]{2})\s+"
"(?P[0-9]{2}:[0-9]{2}:[0-9]{2})Z?"
)
for key in keywords:
if key not in self.exif or not self.exif[key]:
continue
date = None
v = pattern.match(self.exif[key]).groupdict()
if not v:
continue
try:
date = arrow.get('%s-%s-%s %s' % (v['Y'], v['M'], v['D'], v['T']))
except:
continue
if date:
self.date = date
logging.debug("date for %s is set to %s from key %s", self.fname, self.date, key)
break
self.img.title = self.title
self.img.alttext = self.content
self.content = self.content + "\n\n" + self.img.mksrcset(generate_caption=False, uphoto=True)
self.img.downsize()
self.srcset = self.img.mksrcset(generate_caption=False, uphoto=False)
super(PhotoHandler, self)._postsetup()
def tmpl(self):
tmpl = super(PhotoHandler, self).tmpl()
tmpl['exif'] = {}
mapping = {
'camera': [
'EXIF:Model'
],
'aperture': [
'EXIF:FNumber',
'Composite:Aperture'
],
'shutter_speed': [
'EXIF:ExposureTime'
],
'focallength': [
'EXIF:FocalLength',
'Composite:FocalLength35efl',
],
'iso': [
'EXIF:ISO'
],
'lens': [
'Composite:LensID',
'MakerNotes:Lens',
'Composite:LensSpec'
]
}
for ekey, candidates in mapping.items():
for candidate in candidates:
if candidate in self.exif:
tmpl['exif'][ekey] = self.exif[candidate]
break
gps = ['Latitude', 'Longitude']
for g in gps:
gk = 'EXIF:GPS%s' % (g)
if gk not in self.exif:
continue
r = 'EXIF:GPS%sRef' % (g)
ref = None
if r in self.exif:
ref = self.exif[r]
tmpl['exif']['geo_%s' % (g.lower())] = self.gps2dec(
self.exif[gk],
ref
)
##tmpl['imgurl'] = ''
#sizes = collections.OrderedDict(reversed(list(self.img.sizes.items())))
#for size, meta in sizes.items():
#if os.path.isfile(meta['path']):
#with Image.open(meta['path']) as im:
#meta['width'], meta['height'] = im.size
#meta['size'] = os.path.getsize(meta['path'])
#tmpl['img'] = meta
#break
tmpl['img'] = self.img.meta
return tmpl
@staticmethod
def gps2dec(exifgps, ref=None):
pattern = re.compile(r"(?P[0-9.]+)\s+deg\s+(?P[0-9.]+)'\s+(?P[0-9.]+)\"(?:\s+(?P[NEWS]))?")
v = pattern.match(exifgps).groupdict()
dd = float(v['deg']) + (((float(v['min']) * 60) + (float(v['sec']))) / 3600)
if ref == 'West' or ref == 'South' or v['dir'] == "S" or v['dir'] == "W":
dd = dd * -1
return round(dd, 6)
class PageHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(PageHandler, self).__init__(*args, **kwargs)
self._setup()
def _setup(self):
with open(self.fpath) as c:
self.content = c.read()
c.close()
self._content = '%s' % (self.content)
self._adaptify()
super(PageHandler, self)._postsetup()
self.template = 'page.html'