1.0 version

This commit is contained in:
Peter Molnar 2017-05-23 11:10:30 +01:00
commit f5c599cef9
11 changed files with 3276 additions and 0 deletions

103
.gitignore vendored Normal file
View file

@ -0,0 +1,103 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
config.ini
config.yml

56
cache.py Normal file
View file

@ -0,0 +1,56 @@
import os
import json
import hashlib
import logging
import glob
class Cached(object):
def __init__(self, hash='', text='', stime=0):
if not os.path.isdir(glob.CACHE):
os.mkdir(glob.CACHE)
if hash:
self._hbase = hash
elif text:
self._hbase = hashlib.sha1(text.encode('utf-8')).hexdigest()
else:
print("No identifier passed for Cached")
raise
self._cpath = os.path.join(glob.CACHE, self._hbase)
self._stime = stime
if os.path.isfile(self._cpath):
self._ctime = os.stat(self._cpath)
else:
self._ctime = None
def get(self):
if not glob.CACHEENABLED:
return None
cached = ''
if os.path.isfile(self._cpath):
if self._stime and self._stime.st_mtime == self._ctime.st_mtime:
logging.debug("Cache exists at %s; using it" % (self._cpath ))
with open(self._cpath, 'r') as c:
cached = c.read()
c.close()
# invalidate old
elif self._stime and self._stime.st_mtime > self._ctime.st_mtime:
logging.debug("invalidating cache at %s" % (self._cpath ))
os.remove(self._cpath)
return cached
def set(self, content):
if not glob.CACHEENABLED:
return None
with open(self._cpath, "w") as c:
logging.debug("writing cache to %s" % (self._cpath ))
c.write(content)
c.close()
if self._stime:
os.utime(self._cpath, (self._stime.st_mtime, self._stime.st_mtime ))

293
generator.py Normal file
View file

@ -0,0 +1,293 @@
#!/home/petermolnar.net/.venv/bin/python3.5
"""Usage: generator.py [-h] [-f] [-g] [-p] [-d] [-s FILE]
-h --help show this
-f --force force HTML file rendering
-p --pandoc force re-rendering content HTML
-g --regenerate regenerate images
-s --single FILE only (re)generate a single entity
-d --debug set logging level
"""
import os
import shutil
import logging
import atexit
import json
import sys
import tempfile
import glob
from whoosh import index
from docopt import docopt
from ruamel import yaml
from webmentiontools.send import WebmentionSend
import taxonomy
import singular
from slugify import slugify
import arrow
class Engine(object):
lockfile = "/tmp/petermolnar.net.generator.lock"
def __init__(self):
if os.path.isfile(self.lockfile):
raise ValueError("Lockfile %s is present; generator won't run.")
else:
with open(self.lockfile, "w") as lock:
lock.write(arrow.utcnow().format())
lock.close()
atexit.register(self.removelock)
atexit.register(self.removetmp)
self._mkdirs()
self.tags = {}
self.category = {}
self.allposts = None
self.frontposts = None
self.slugsdb = os.path.join(glob.CACHE, "slugs.json")
if os.path.isfile(self.slugsdb):
with open(self.slugsdb) as slugsdb:
self.allslugs = json.loads(slugsdb.read())
slugsdb.close()
else:
self.allslugs = []
self.tmpwhoosh = tempfile.mkdtemp('whooshdb_', dir=tempfile.gettempdir())
self.whoosh = index.create_in(self.tmpwhoosh, glob.schema)
def removelock(self):
os.unlink(self.lockfile)
def removetmp(self):
if os.path.isdir(self.tmpwhoosh):
for root, dirs, files in os.walk(self.tmpwhoosh, topdown=False):
for f in files:
os.remove(os.path.join(root, f))
for d in dirs:
os.rmdir(os.path.join(root, d))
def initbuilder(self):
self._copy_and_compile()
def cleanup(self):
with open(os.path.join(glob.CACHE, "slugs.json"), "w") as db:
logging.info("updating slugs database")
db.write(json.dumps(self.allslugs))
db.close()
tags = []
for tslug, taxonomy in self.tags.items():
tags.append(taxonomy.name)
with open(os.path.join(glob.CACHE, "tags.json"), "w") as db:
logging.info("updating tags database")
db.write(json.dumps(tags))
db.close()
logging.info("deleting old searchdb")
shutil.rmtree(glob.SEARCHDB)
logging.info("moving new searchdb")
shutil.move(self.tmpwhoosh, glob.SEARCHDB)
def _mkdirs(self):
for d in [glob.TARGET, glob.TFILES, glob.TTHEME, glob.CACHE]:
if not os.path.isdir(d):
os.mkdir(d)
def _copy_and_compile(self):
for f in os.listdir(glob.STHEME):
p = os.path.join(glob.STHEME, f)
if os.path.isdir(p):
try:
shutil.copytree(p, os.path.join(glob.TTHEME, f))
except FileExistsError:
pass
else:
path, fname = os.path.split(p)
fname, ext = os.path.splitext(fname)
logging.debug("copying %s", p)
shutil.copy(p, os.path.join(glob.TTHEME, f))
@staticmethod
def postbycategory(fpath, catd=None, catn=None):
if catd == 'photo':
post = singular.PhotoHandler(fpath, category=catn)
elif catd == 'page':
post = singular.PageHandler(fpath)
else:
post = singular.ArticleHandler(fpath, category=catn)
return post
def collect(self):
self.allposts = taxonomy.TaxonomyHandler()
#self.gallery = taxonomy.TaxonomyHandler(taxonomy="photography", name="Photography")
self.frontposts = taxonomy.TaxonomyHandler()
for category in glob.conf['category'].items():
catn, catd = category
catp = os.path.abspath(os.path.join(glob.CONTENT, catn))
if not os.path.exists(catp):
continue
logging.debug("getting posts for category %s from %s", catn, catp)
cat = taxonomy.TaxonomyHandler(taxonomy='category', name=catn)
self.category[catn] = cat
for f in os.listdir(catp):
fpath = os.path.join(catp, f)
if not os.path.isfile(fpath):
continue
logging.debug("parsing %s", fpath)
exclude = False
if 'exclude' in catd:
exclude = bool(catd['exclude'])
ct = None
if 'type' in catd:
ct = catd['type']
post = Engine.postbycategory(fpath, catd=ct, catn=catn)
self.allposts.append(post)
if post.dtime > arrow.utcnow().timestamp:
logging.warning(
"Post '%s' will be posted in the future; "
"skipping it from Taxonomies for now", fpath
)
else:
cat.append(post)
if not exclude:
self.frontposts.append(post)
if hasattr(post, 'tags') and isinstance(post.tags, list):
for tag in post.tags:
tslug = slugify(tag, only_ascii=True, lower=True)
if not tslug in self.tags.keys():
t = taxonomy.TaxonomyHandler(taxonomy='tag', name=tag)
self.tags[tslug] = t
else:
t = self.tags[tslug]
t.append(post)
elif not hasattr(post, 'tags'):
logging.error("%s post does not have tags", post.fname)
elif not isinstance(post.tags, list):
logging.error(
"%s tags are not a list, it's %s ",
post.fname,
type(post.tags)
)
for r in post.redirect.keys():
self.allslugs.append(r)
self.allslugs.append(post.fname)
def renderposts(self):
for p in self.allposts.posts.items():
time, post = p
post.write()
post.redirects()
post.pings()
post.index(self.whoosh)
def rendertaxonomies(self):
for t in [self.tags, self.category]:
for tname, tax in t.items():
if glob.conf['category'].get(tname, False):
if glob.conf['category'][tname].get('nocollection', False):
logging.info("skipping taxonomy '%s' due to config nocollections", tname)
continue
tax.write_paginated()
tax.index(self.whoosh)
self.frontposts.write_paginated()
#self.gallery.write_simple(template='gallery.html')
self.allposts.writesitemap()
def globredirects(self):
redirects = os.path.join(glob.CONTENT,'redirects.yml')
if not os.path.isfile(redirects):
return
ftime = os.stat(redirects)
rdb = {}
with open(redirects, 'r') as db:
rdb = yaml.safe_load(db)
db.close()
for r_ in rdb.items():
target, slugs = r_
for slug in slugs:
singular.SingularHandler.write_redirect(
slug,
"%s/%s" % (glob.conf['site']['url'], target),
ftime.st_mtime
)
def recordlastrun(self):
if os.path.exists(glob.lastrun):
t = arrow.utcnow().timestamp
os.utime(glob.lastrun, (t,t))
else:
open(glob.lastrun, 'a').close()
if __name__ == '__main__':
args = docopt(__doc__, version='generator.py 0.2')
if args['--pandoc']:
glob.CACHEENABLED = False
if args['--force']:
glob.FORCEWRITE = True
if args['--regenerate']:
glob.REGENERATE = True
logform = '%(asctime)s - %(levelname)s - %(message)s'
if args['--debug']:
loglevel = 10
else:
loglevel = 40
while len(logging.root.handlers) > 0:
logging.root.removeHandler(logging.root.handlers[-1])
logging.basicConfig(level=loglevel, format=logform)
if args['--single']:
logging.info("(re)generating a single item only")
path = args['--single'].split('/')
fpath = os.path.join(glob.CONTENT, path[0], path[1])
post = Engine.postbycategory(fpath, catd=path[0])
post.pings()
post.write()
sys.exit(0)
else:
eng = Engine()
eng.initbuilder()
eng.collect()
eng.renderposts()
eng.globredirects()
eng.rendertaxonomies()
eng.recordlastrun()
eng.cleanup()

109
glob.py Normal file
View file

@ -0,0 +1,109 @@
import os
import logging
from ruamel import yaml
from whoosh import fields
from whoosh import analysis
import jinja2
from slugify import slugify
import arrow
schema = fields.Schema(
url=fields.ID(
stored=True,
),
title=fields.TEXT(
stored=True,
analyzer=analysis.FancyAnalyzer(
)
),
date=fields.DATETIME(
stored=True,
sortable=True
),
content=fields.TEXT(
stored=True,
analyzer=analysis.FancyAnalyzer(
)
),
tags=fields.TEXT(
stored=True,
analyzer=analysis.KeywordAnalyzer(
lowercase=True,
commas=True
)
),
weight=fields.NUMERIC(
sortable=True
),
img=fields.TEXT(
stored=True
)
)
BASEDIR = os.path.dirname(os.path.abspath(__file__))
CONFIG = os.path.abspath(os.path.join(BASEDIR, 'config.yml'))
with open(CONFIG, 'r') as c:
conf = yaml.safe_load(c)
conf['site']['author'] = conf['author']
c.close()
secrets = os.path.abspath(os.path.join(BASEDIR, 'secret.yml'))
if os.path.isfile(secrets):
with open(secrets, 'r') as c:
conf['secrets'] = yaml.safe_load(c)
c.close()
CACHEENABLED = True
REGENERATE = False
FORCEWRITE = False
ISODATE = '%Y-%m-%dT%H:%M:%S%z'
SOURCE = os.path.abspath(conf['dirs']['source']['root'])
CONTENT = os.path.abspath(conf['dirs']['source']['content'])
FONT = os.path.abspath(conf['dirs']['font'])
STHEME = os.path.abspath(conf['dirs']['source']['theme'])
SFILES = os.path.abspath(conf['dirs']['source']['files'])
TEMPLATES = os.path.abspath(conf['dirs']['source']['templates'])
COMMENTS = os.path.abspath(conf['dirs']['source']['comments'])
TARGET = os.path.abspath(conf['dirs']['target']['root'])
TTHEME = os.path.abspath(conf['dirs']['target']['theme'])
TFILES = os.path.abspath(conf['dirs']['target']['files'])
UFILES = conf['dirs']['target']['furl']
CACHE = os.path.abspath(conf['dirs']['cache'])
SEARCHDB = os.path.abspath(conf['dirs']['searchdb'])
WEBMENTIONDB = os.path.abspath(conf['webmentiondb'])
LOGDIR = os.path.abspath(conf['dirs']['log'])
GPSDIR = os.path.abspath(conf['dirs']['gps'])
TSDBDIR = os.path.abspath(conf['dirs']['tsdb'])
LOCALCOPIES = os.path.abspath(conf['dirs']['localcopies'])
lastrun = '/tmp/generator_last_run'
os.environ.setdefault('PYPANDOC_PANDOC', '/usr/bin/pandoc')
def jinja_filter_date(d, form='%Y-%m-%d %H:%m:%S'):
if d == 'now':
return arrow.now().strftime(form)
if form == 'c':
form = '%Y-%m-%dT%H:%M:%S%z'
return d.strftime(form)
def jinja_filter_slugify(s):
return slugify(s, only_ascii=True, lower=True)
def jinja_filter_search(s, r):
if r in s:
return True
return False
jinjaldr = jinja2.FileSystemLoader(searchpath=TEMPLATES)
jinja2env = jinja2.Environment(loader=jinjaldr)
jinja2env.filters['date'] = jinja_filter_date
jinja2env.filters['search'] = jinja_filter_search
jinja2env.filters['slugify'] = jinja_filter_slugify

370
img.py Normal file
View file

@ -0,0 +1,370 @@
import os
import re
import sys
import json
import shutil
import collections
import logging
import imghdr
from ctypes import c_void_p, c_size_t
import glob
import pyexifinfo
from similar_text import similar_text
from cache import Cached
import wand.api
import wand.image
import wand.drawing
import wand.color
from PIL import Image
#from subprocess import call
# https://stackoverflow.com/questions/34617422/how-to-optimize-image-size-using-wand-in-python
wand.api.library.MagickSetCompressionQuality.argtypes = [c_void_p, c_size_t]
class ImageHandler(object):
def __init__(self, fpath, alttext='', title='', imgcl='', linkto=False):
self.fpath = os.path.abspath(fpath)
path, fname = os.path.split(self.fpath)
fname, ext = os.path.splitext(fname)
self.fname = fname
self.fext = ext
self.ftime = os.stat(self.fpath)
self.linkto = linkto
self.alttext = alttext
self.title = title
self.imgcl = imgcl
self.c = os.path.join(glob.TFILES, self.fname)
self.u = "%s/%s/%s" % (glob.conf['site']['url'],glob.UFILES, self.fname)
self.what = imghdr.what(self.fpath)
self.meta = {}
self.exif = {}
if self.what == 'jpeg':
self._setexif()
self.watermark = ''
wfile = os.path.join(glob.SOURCE, glob.conf['watermark'])
if os.path.isfile(wfile):
self.watermark = wfile
sizes = {
90: {
'ext': 's',
'cropped': True,
},
360: {
'ext': 'm',
},
#540: 'n',
720: {
'ext': 'z',
},
#980: 'c',
1280: {
'ext': 'b',
}
}
self.sizes = collections.OrderedDict(sorted(sizes.items(), reverse=0))
for size, meta in self.sizes.items():
meta['path'] = "%s_%s%s" % (self.c, meta['ext'], self.fext)
meta['url'] = "%s_%s%s" % (self.u, meta['ext'], self.fext)
meta['mime'] = "image/%s" % (self.what)
self._setmeta()
self.fallbacksize = 720
self.srcsetmin = 720
self._is_photo()
if self.is_photo:
self.srcset = self.mksrcset(generate_caption=False, uphoto=False)
def _setmeta(self):
s = collections.OrderedDict(reversed(list(self.sizes.items())))
for size, meta in s.items():
if os.path.isfile(meta['path']):
with Image.open(meta['path']) as im:
meta['width'], meta['height'] = im.size
meta['size'] = os.path.getsize(meta['path'])
self.meta = meta
break
def downsize(self, liquidcrop=True, watermark=True):
if not self._is_downsizeable():
return self._copy()
if not self._isneeded():
logging.debug("downsizing not needed for %s", self.fpath)
return
logging.debug("downsizing %s", self.fpath)
try:
img = wand.image.Image(filename=self.fpath)
img.auto_orient()
except:
print("Unexpected error:", sys.exc_info()[0])
raise
# watermark
if self.is_photo and self.watermark and img.format == "JPEG" and watermark:
img = self._watermark(img)
elif self.linkto:
img = self._sourceurlmark(img)
# resize & cache
for size, meta in self.sizes.items():
self._intermediate(img, size, meta)
self._setmeta()
def _setexif(self):
cached = Cached(text=self.fname, stime=self.ftime)
cexif = cached.get()
if cexif:
self.exif = json.loads(cexif)
else:
exif = pyexifinfo.get_json(self.fpath)
self.exif = exif.pop()
cached.set(json.dumps(self.exif))
def _is_photo(self):
self.is_photo = False
if 'cameras' in glob.conf:
if 'EXIF:Model' in self.exif:
if self.exif['EXIF:Model'] in glob.conf['cameras']:
self.is_photo = True
if 'copyright' in glob.conf:
if 'IPTC:CopyrightNotice' in self.exif:
for s in glob.conf['copyright']:
pattern = re.compile(r'%s' % s)
if pattern.search(self.exif['IPTC:CopyrightNotice']):
self.is_photo = True
if self.is_photo:
#self.category = "photo"
if not self.alttext:
keywords = ['XMP:Description', 'IPTC:Caption-Abstract']
for key in keywords:
if key in self.exif and self.exif[key]:
self.alttext = self.exif[key]
break
if not self.title:
keywords = ['XMP:Title', 'XMP:Headline', 'IPTC:Headline']
for key in keywords:
if key in self.exif and self.exif[key]:
self.title = self.exif[key]
break
def _is_downsizeable(self):
if self.what != 'jpeg' and self.what != 'png':
return False
if self.imgcl:
return False
return True
def _watermark(self, img):
wmark = wand.image.Image(filename=self.watermark)
if img.width > img.height:
w = img.width * 0.16
h = wmark.height * (w / wmark.width)
x = img.width - w - (img.width * 0.01)
y = img.height - h - (img.height * 0.01)
else:
w = img.height * 0.16
h = wmark.height * (w / wmark.width)
x = img.width - h - (img.width * 0.01)
y = img.height - w - (img.height * 0.01)
w = round(w)
h = round(h)
x = round(x)
y = round(y)
wmark.resize(w, h)
if img.width < img.height:
wmark.rotate(-90)
img.composite(image=wmark, left=x, top=y)
return img
def _sourceurlmark(self, img):
with wand.drawing.Drawing() as draw:
draw.fill_color = wand.color.Color('#fff')
draw.fill_opacity = 0.8
draw.stroke_color = wand.color.Color('#fff')
draw.stroke_opacity = 0.8
r_h = round(img.height * 0.3)
r_top = round((img.height/2) - (r_h/2))
draw.rectangle(
left=0,
top=r_top,
width=img.width,
height=r_h
)
draw(img)
with wand.drawing.Drawing() as draw:
draw.font = os.path.join(glob.FONT)
draw.font_size = round((img.width)/len(self.linkto)*1.5)
draw.gravity = 'center'
draw.text(
0,
0,
self.linkto
)
draw(img)
return img
def _copy(self):
p = self.c + self.fext
if not os.path.isfile(p):
logging.debug("copying %s" % self.fpath)
shutil.copy(self.fpath, p)
return
def _isneeded(self):
# skip existing
needed = False
if glob.REGENERATE:
needed = True
else:
for size, meta in self.sizes.items():
if not os.path.isfile(meta['path']):
needed = True
return needed
def _intermediate_dimensions(self, img, size, meta):
if (img.width > img.height and 'crop' not in meta) \
or (img.width < img.height and 'crop' in meta):
width = size
height = int(float(size / img.width) * img.height)
else:
height = size
width = int(float(size / img.height) * img.width)
return (width, height)
def _intermediate_symlink(self, meta):
# create a symlink to the largest resize with the full filename;
# this is to ensure backwards compatibility and avoid 404s
altsrc = meta['path']
altdst = self.c + self.fext
if not os.path.islink(altdst):
if os.path.isfile(altdst):
os.unlink(altdst)
os.symlink(altsrc, altdst)
def _intermediate(self, img, size, meta):
# skip existing unless regenerate needed
if os.path.isfile(meta['path']) and not glob.REGENERATE:
return
# too small images: move on
#if size > img.height and size > img.width:
# return
width, height = self._intermediate_dimensions(img, size, meta)
try:
thumb = img.clone()
thumb.resize(width, height)
#thumb.resize(width, height, filter='robidouxsharp')
if 'crop' in meta and liquidcrop:
thumb.liquid_rescale(size, size, 1, 1)
elif 'crop' in meta:
l = t = 0
if width > size:
l = int((width - size) / 2)
if height > size:
t = int((height - size) / 2)
thumb.crop(left=l, top=t, width=size, height=size)
if img.format == "PNG":
library.MagickSetCompressionQuality(img.wand, 75)
if img.format == "JPEG":
thumb.compression_quality = 86
thumb.unsharp_mask(radius=0, sigma=0.5, amount=1, threshold=0.03)
thumb.format = 'pjpeg'
# this is to make sure pjpeg happens
with open(meta['path'], 'wb') as f:
thumb.save(file=f)
if size == list(self.sizes.keys())[-1]:
self._intermediate_symlink(meta)
#if img.format == "JPEG":
## this one strips the embedded little jpg
#call(['/usr/bin/jhead', '-dt', '-q', cpath])
except:
print("Unexpected error:", sys.exc_info()[0])
raise
def mksrcset(self, generate_caption=True, uphoto=False):
if not self._is_downsizeable():
return False
for size, meta in self.sizes.items():
if 'crop' in meta:
continue
# increase fallback until max fallback reached
if size <= self.fallbacksize:
fallback = meta['url']
# set target for the largest
target = meta['url']
if uphoto:
uphotoclass=' u-photo'
else:
uphotoclass=''
caption = ''
if not self.imgcl:
cl = ''
else:
cl = self.imgcl
if self.alttext \
and similar_text(self.alttext, self.fname) < 90 \
and similar_text(self.alttext, self.fname + '.' + self.fext) < 90 \
and generate_caption:
caption = '<figcaption class=\"caption\">%s</figcaption>' % (self.alttext)
if self.linkto:
target = self.linkto
return '<figure class="photo"><a target="_blank" class="adaptive%s" href="%s"><img src="%s" class="adaptimg %s" alt="%s" /></a>%s</figure>' % (uphotoclass, target, fallback, self.imgcl, self.alttext, caption)

203
new.py Executable file
View file

@ -0,0 +1,203 @@
#!/home/petermolnar.net/.venv/bin/python3.5
"""Usage: new.py [-h] [-t TAGS] [-d DATE] [-s SLUG] [-l TITLE] [-b BOOKMARK] [-r REPLY] [-p REPOST] [-c CONTENT] [-u SUMMARY] [-i REDIRECT] [-a CATEGORY]
-h --help show this
-t --tags TAGS ';' separated, quoted list of tags
-d --date DATE YYYY-mm-ddTHH:MM:SS+TZTZ formatted date, if not now
-s --slug SLUG slug (normally autogenerated from title or pubdate)
-l --title TITLE title of new entry
-b --bookmark BOOKMARK URL to bookmark
-r --reply REPLY URL to reply to
-p --repost REPOST URL to repost
-c --content CONTENT content of entry
-u --summary SUMMARY summary of entry
-i --redirect REDIRECT ';' separated, quoted list of redirects
-a --category CATEGORY to put the content in this category
"""
import os
import sys
import datetime
import calendar
import logging
import json
import glob
import iso8601
import pytz
from docopt import docopt
from slugify import slugify
from ruamel import yaml
import singular
class ContentCreator(object):
def __init__(
self,
category='note',
tags=[],
date='',
slug='',
title='',
bookmark='',
reply='',
repost='',
content='',
summary='',
redirect=[]
):
self.category = category
if date:
self.date = iso8601.parse_date(date)
else:
self.date = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
self.time = calendar.timegm(self.date.timetuple())
self.title = title
if slug:
self.slug = slug
elif title:
self.slug = slugify(title, only_ascii=True, lower=True)
else:
self.slug = singular.SingularHandler.baseN(self.time)
self.tags = tags
self.bookmark = bookmark
self.reply = reply
self.repost = repost
if content:
self.content = content
else:
self.content = ''
self.summary = summary
self.redirect = redirect
self._makeyaml()
self._write()
def _makeyaml(self):
self.yaml = {
'published': self.date.strftime("%Y-%m-%dT%H:%M:%S%z")
}
if self.title:
self.yaml['title'] = self.title
if self.tags:
self.yaml['tags'] = self.tags
if self.bookmark:
self.yaml['bookmark-of'] = self.bookmark
if self.repost:
self.yaml['repost-of'] = self.repost
if self.reply:
self.yaml['in-reply-to'] = self.reply
if self.summary:
self.yaml['summary'] = self.summary
if self.redirect:
self.yaml['redirect'] = self.redirect
def _write(self):
fdir = os.path.join(glob.CONTENT, self.category)
if not os.path.isdir(fdir):
sys.exit("there is no category %s" % (self.category))
self.fpath = os.path.join(glob.CONTENT, self.category, "%s.md" % (self.slug))
self.out = "---\n" + yaml.dump(self.yaml, Dumper=yaml.RoundTripDumper) + "---\n\n" + self.content
with open(self.fpath, "w") as archive:
logging.info("writing %s", self.fpath)
logging.info("contents: %s", self.out)
archive.write(self.out)
archive.close()
class ParseCMDLine(object):
def __init__(self, arguments):
for x in ['--redirect', '--tags']:
if x in arguments and arguments[x]:
arguments[x] = arguments[x].split(";")
self.entry = ContentCreator(
category=arguments['--category'],
tags=arguments['--tags'],
date=arguments['--date'],
slug=arguments['--slug'],
title=arguments['--title'],
bookmark=arguments['--bookmark'],
reply=arguments['--reply'],
repost=arguments['--repost'],
content=arguments['--content'],
summary=arguments['--summary'],
redirect=arguments['--redirect']
)
if __name__ == '__main__':
args = docopt(__doc__, version='new.py 0.1')
with open(os.path.join(glob.CACHE, "slugs.json")) as sf:
slugs = json.loads(sf.read())
sf.close()
if not args['--category']:
c = 'note'
args['--category'] = input('Category [%s]: ' % (c)) or c
if not args['--date']:
d = datetime.datetime.utcnow().replace(tzinfo=pytz.utc).strftime("%Y-%m-%dT%H:%M:%S%z")
args['--date'] = input('Date [%s]' % (d)) or d
if not args['--title']:
args['--title'] = input('Title []:') or ''
if not args['--tags']:
args['--tags'] = input('Tags (separated by ;, no whitespace) []:') or []
if not args['--bookmark']:
args['--bookmark'] = input('Bookmark of URL []:') or ''
if not args['--reply']:
args['--reply'] = input('Reply to URL []:') or ''
if not args['--repost']:
args['--repost'] = input('Repost of URL []:') or ''
if not args['--slug']:
if args['--title']:
slug = slugify(args['--title'], only_ascii=True, lower=True)
elif args['--bookmark']:
slug = slugify("re: %s" % (args['--bookmark']), only_ascii=True, lower=True)
elif args['--reply']:
slug = slugify("re: %s" % (args['--reply']), only_ascii=True, lower=True)
elif args['--repost']:
slug = slugify("re: %s" % (args['--repost']), only_ascii=True, lower=True)
else:
d = iso8601.parse_date(args['--date'])
t = calendar.timegm(d.timetuple())
slug = singular.SingularHandler.baseN(t)
args['--slug'] = input('Slug [%s]:' % (slug)) or slug
if args['--slug'] in slugs:
logging.warning("This slug already exists: %s", args['--slug'])
slugbase = args['--slug']
inc = 1
while args['--slug'] in slugs:
args['--slug'] = "%s-%d" % (slugbase, inc)
inc = inc+1
logging.warning("Using %s as slug", args['--slug'])
if not args['--summary']:
args['--summary'] = input('Summary []:') or ''
if not args['--content']:
args['--content'] = input('Content []:') or ''
if not args['--redirect']:
args['--reditect'] = input('Additional slugs (separated by ;, no whitespace) []:') or []
p = ParseCMDLine(args)

850
receiver.py Normal file
View file

@ -0,0 +1,850 @@
import glob
import asyncio
import uvloop
import os
from sanic import Sanic
import sanic.response
from sanic.log import log as logging
from whoosh import index, qparser
import pynmea2
import datetime
import pytz
import re
import validators
import requests
import pypandoc
import hashlib
import time
from webmentiontools import urlinfo
import json
import calendar
import mimetypes
import singular
import urllib.parse
from ruamel import yaml
from slugify import slugify
import smtplib
import iso8601
import csv
import shutil
import collections
from git import Repo, Actor
import frontmatter
#import gzip
import arrow
class ToEmail(object):
def __init__(self, webmention):
self.webmention = webmention
self.set_html()
self.set_headers()
def set_html(self):
for authormeta in ['email', 'name', 'url']:
if not authormeta in self.webmention['author']:
self.webmention['author'][authormeta] = ''
html = """
<html>
<head></head>
<body>
<h1>
New %s
</h1>
<dl>
<dt>From</dt>
<dd>
<a href="%s">%s</a><br />
<a href="mailto:%s">%s</a>
</dd>
<dt>Source</dt>
<dd><a href="%s">%s</a></dd>
<dt>Target</dt>
<dd><a href="%s">%s</a></dd>
</dl>
%s
</body>
</html>""" % (
self.webmention['type'],
self.webmention['author']['url'],
self.webmention['author']['name'],
self.webmention['author']['email'],
self.webmention['author']['email'],
self.webmention['source'],
self.webmention['source'],
self.webmention['target'],
self.webmention['target'],
pypandoc.convert_text(
self.webmention['content'],
to='html5',
format="markdown+" + "+".join([
'backtick_code_blocks',
'auto_identifiers',
'fenced_code_attributes',
'definition_lists',
'grid_tables',
'pipe_tables',
'strikeout',
'superscript',
'subscript',
'markdown_in_html_blocks',
'shortcut_reference_links',
'autolink_bare_uris',
'raw_html',
'link_attributes',
'header_attributes',
'footnotes',
])
)
)
self.html = html
def set_headers(self):
""" Create and send email from a parsed webmention """
self.headers = {
'Content-Type': 'text/html; charset=utf-8',
'Content-Disposition': 'inline',
'Content-Transfer-Encoding': '8bit',
'Date': self.webmention['date'].strftime('%a, %d %b %Y %H:%M:%S %Z'),
'X-WEBMENTION-SOURCE': self.webmention['source'],
'X-WEBMENTION-TARGET': self.webmention['target'],
'From': glob.conf['from']['address'],
'To': glob.conf['to']['address'],
'Subject': "[webmention] from %s to %s" % ( self.webmention['source'], self.webmention['target'] ),
}
def send(self):
msg = ''
for key, value in self.headers.items():
msg += "%s: %s\n" % ( key, value )
msg += "\n%s\n" % self.html
try:
s = smtplib.SMTP( glob.conf['smtp']['host'], glob.conf['smtp']['port'] )
if glob.conf['smtp']['tls']:
s.ehlo()
s.starttls()
s.ehlo()
if glob.conf['smtp']['username'] and glob.conf['smtp']['password']:
s.login(glob.conf['smtp']['username'], glob.conf['smtp']['password'])
s.sendmail( self.headers['From'], [ self.headers['To'] ], msg.encode("utf8") )
s.quit()
except:
print("Unexpected error:", sys.exc_info()[0])
raise
class MicropubHandler(object):
def __init__(self, request):
self.request = request
self.response = sanic.response.text("Unhandled error", status=500)
self.slug = ''
self.content = ''
self.category = 'note'
self.meta = {}
self.dt = datetime.datetime.now().replace(tzinfo=pytz.utc)
logging.debug("incoming micropub request:")
logging.debug(self.request.body)
logging.debug("** args:")
logging.debug(self.request.args)
logging.debug("** query string:")
logging.debug(self.request.query_string)
logging.debug("** headers:")
logging.debug(self.request.headers)
with open(os.path.join(glob.CACHE, "tags.json"), "r") as db:
self.existing_tags = json.loads(db.read())
db.close()
self._parse()
def _verify(self):
if 'q' in self.request.args:
if 'config' in self.request.args['q']:
self.response = sanic.response.json({
'tags': self.existing_tags
}, status=200)
return
if 'syndicate-to' in self.request.args['q']:
self.response = sanic.response.json({
'syndicate-to': []
}, status=200)
return
if not 'access_token' in self.request.form:
self.response = sanic.response.text("Mising access token", status=401)
return
token = self.request.form.get('access_token')
verify = requests.get(
'https://tokens.indieauth.com/token',
allow_redirects=False,
timeout=10,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Bearer %s' % (token)
});
if verify.status_code != requests.codes.ok:
self.response = sanic.response.text("Could not verify access token", status=500)
return False
response = urllib.parse.parse_qs(verify.text)
logging.debug(response)
if 'scope' not in response or 'me' not in response:
self.response = sanic.response.text("Could not verify access token", status=401)
return False
if '%s/' % (glob.conf['site']['url'].rstrip()) not in response['me']:
self.response = sanic.response.text("You can't post to this domain.", status=401)
return False
if 'post' not in response['scope'] and 'create' not in response['scope']:
self.response = sanic.response.text("Invalid scope", status=401)
return False
return True
def _parse(self):
if not self._verify():
return
if len(self.request.files):
self.response = sanic.response.text("File handling is not yet done", status=501)
return
#for ffield in self.request.files.keys():
#logging.info("got file field: %s" % ffield)
#f = self.request.files.get(ffield)
#logging.info("mime is: %s" % f.type)
#logging.info("ext should be: %s" % mimetypes.guess_extension(f.type))
##f.body
##f.type
##logging.info( f )
self.meta['published'] = self.dt.strftime('%Y-%m-%dT%H:%M:%S%z')
slug = None
if 'content' in self.request.form and len(self.request.form.get('content')):
self.content = self.request.form.get('content')
if 'summary' in self.request.form and len(self.request.form.get('summary')):
self.meta['summary'] = self.request.form.get('summary')
if 'slug' in self.request.form and len(self.request.form.get('slug')):
slug = self.request.form.get('slug')
if 'name' in self.request.form and len(self.request.form.get('name')):
self.meta['title'] = self.request.form.get('name')
if not slug:
slug = self.meta['title']
if 'in-reply-to' in self.request.form and len(self.request.form.get('in-reply-to')):
self.meta['in-reply-to'] = self.request.form.get('in-reply-to')
if not slug:
slug = 're: %s', self.meta['in-reply-to']
if 'repost-of' in self.request.form and len(self.request.form.get('repost-of')):
self.meta['repost-of'] = self.request.form.get('repost-of')
category = 'bookmark'
if not slug:
slug = '%s', self.meta['repost-of']
if 'bookmark-of' in self.request.form and len(self.request.form.get('bookmark-of')):
self.meta['bookmark-of'] = self.request.form.get('bookmark-of')
self.category = 'bookmark'
if not slug:
slug = '%s', self.meta['bookmark-of']
if 'category[]' in self.request.form:
self.meta['tags'] = list(self.request.form['category[]'])
if 'summary' in self.meta and ('IT' in self.meta['tags'] or 'it' in self.meta['tags']):
self.category = 'article'
elif 'summary' in self.meta and ('journal' in self.meta['tags'] or 'journal' in self.meta['tags']):
self.category = 'journal'
if not slug:
slug = singular.SingularHandler.baseN(calendar.timegm(self.dt.timetuple()))
self.slug = slugify(slug, only_ascii=True, lower=True)
self._write()
def _write(self):
fpath = os.path.join(glob.CONTENT, self.category, '%s.md' % (self.slug))
if os.path.isfile(fpath):
self.response = sanic.response.text("Update handling is not yet done", status=501)
return
logfile = os.path.join(glob.LOGDIR, "micropub-%s.log" % (self.dt.strftime("%Y-%m")))
with open (logfile, 'a') as micropublog:
logging.debug("logging micropub request")
micropublog.write("%s %s\n" % (self.dt.strftime('%Y-%m-%dT%H:%M:%S%z'), fpath))
micropublog.close()
with open (fpath, 'w') as mpf:
logging.info("writing file to: %s", fpath)
out = "---\n" + yaml.dump(self.meta, Dumper=yaml.RoundTripDumper, allow_unicode=True, indent=4) + "---\n\n" + self.content
mpf.write(out)
mpf.close()
self._git(fpath)
logging.info("trying to open and parse the received post")
post = singular.ArticleHandler(fpath, category=self.category)
post.write()
post.pings()
self.response = sanic.response.text(
"Post created",
status = 201,
headers = {
'Location': "%s/%s/" % (glob.conf['site']['url'], self.slug)
}
)
return
def _git(self, fpath):
logging.info("committing to git")
repo = Repo(glob.CONTENT)
author = Actor(glob.conf['author']['name'], glob.conf['author']['email'])
index = repo.index
newfile = fpath.replace(glob.CONTENT, '').lstrip('/')
index.add([newfile])
message = 'new content via micropub: %s' % (newfile)
index.commit(message, author=author, committer=author)
class SearchHandler(object):
def __init__ (self, query):
self.query = query
self.response = sanic.response.text("You seem to have forgot to enter what you want to search for. Please try again.", status=400)
if not query:
return
self._tmpl = glob.jinja2env.get_template('searchresults.html')
self._ix = index.open_dir(glob.SEARCHDB)
self._parse()
def _parse(self):
self.query = self.query.replace('+', ' AND ')
self.query = self.query.replace(' -', ' NOT ')
qp = qparser.MultifieldParser(
["title", "content", "tags"],
schema = glob.schema
)
q = qp.parse(self.query)
r = self._ix.searcher().search(q, sortedby="weight", limit=100)
logging.info("results for '%s': %i", self.query, len(r))
results = []
for result in r:
res = {
'title': result['title'],
'url': result['url'],
'highlight': result.highlights("content"),
}
if 'img' in result:
res['img'] = result['img']
results.append(res)
tvars = {
'term': self.query,
'site': glob.conf['site'],
'posts': results,
'taxonomy': {}
}
logging.info("collected %i results to render", len(results))
html = self._tmpl.render(tvars)
self.response = sanic.response.html(html, status=200)
class WebmentionHandler(object):
def __init__ ( self, source, target ):
self.source = source
self.target = target
self.time = arrow.utcnow().timestamp
logging.debug("validating: from: %s; to: %s" % (self.source, self.target) )
self.response = sanic.response.json({
'status': 'ok','msg': 'accepted',
}, 200)
self._validate()
self._parse()
self._archive()
self._send()
def _validate(self):
if not validators.url(self.source):
self.response = sanic.response.json({
'status': 'error','msg': '"souce" parameter is an invalid URL',
}, 400)
return
if not validators.url(self.target):
self.response = sanic.response.json({
'status': 'error','msg': '"target" parameter is an invalid URL',
}, 400)
return
_target = urllib.parse.urlparse(self.target)
_target_domain = '{uri.netloc}'.format(uri=_target)
if not _target_domain in glob.conf['accept_domains']:
self.response = sanic.response.json({
'status': 'error',
'msg': "%s' is not in the list of allowed domains" % (
_target_domain
)
}, 400)
return
_source = urllib.parse.urlparse(self.source)
_source_domain = '{uri.netloc}'.format(uri=_source)
if _source_domain == _target_domain and not glob.conf['allow_selfmention']:
self.response = sanic.response.json({
'status': 'error',
'msg': "selfpings are disabled"
}, 400)
return
return
def _parse(self):
if self.response.status != 200:
return
self._log()
self._source = urlinfo.UrlInfo(self.source)
if self._source.error:
logging.warning( "couldn't fetch %s; dropping webmention" % (self.source))
return
self.source = self._source.realurl
if not self._source.linksTo(self.target):
logging.warning( "%s is not linking to %s; dropping webmention" % (self.source, self.target))
return
self._target = urlinfo.UrlInfo(self.target)
if self._target.error:
logging.warning( "couldn't fetch %s; dropping webmention" % (self.target))
return
self.target = self._target.realurl
self.webmention = {
'author': self._source.author(),
'type': self._source.relationType(),
'target': self.target,
'source': self.source,
'date': arrow.get(self._source.pubDate()),
'content': pypandoc.convert_text(
self._source.content(),
to="markdown-" + "-".join([
'raw_html',
'native_divs',
'native_spans',
]),
format='html'
)
}
def _send(self):
if self.response.status != 200:
return
m = ToEmail(self.webmention)
m.send()
def _archive(self):
if self.response.status != 200:
return
fbase = self.webmention['date'].format('YYYY-MM-DD-HH-mm-ss')
fpath = self._archive_name(fbase)
archive = dict(self.webmention)
archive['date'] = archive['date'].format('YYYY-MM-DDTHH.mm.ssZ')
content = archive['content']
del(archive['content'])
with open (fpath, 'w') as f:
logging.info("writing file to: %s", fpath)
out = "---\n" + yaml.dump(
archive,
Dumper=yaml.RoundTripDumper,
allow_unicode=True,
indent=4
) + "---\n\n" + content
f.write(out)
f.close()
def _verify_archive(self, p):
archive = frontmatter.load(p)
if 'target' not in archive.metadata:
logging.warning('missing target')
return False
if 'source' not in archive.metadata:
logging.warning('missing source')
return False
if 'date' not in archive.metadata:
logging.warning('missing date')
return False
if archive.metadata['target'] != self.webmention['target']:
logging.warning('target different')
return False
if archive.metadata['source'] != self.webmention['source']:
logging.warning('source different')
return False
d = arrow.get(archive.metadata['date'])
if d.timestamp != self.webmention['date'].timestamp:
logging.warning('date different')
return False
# overwrite
return True
def _archive_name(self, archive, ext='.md'):
p = os.path.join(glob.COMMENTS, "%s%s" % (archive, ext))
if not os.path.exists(p):
logging.debug("%s doesn't exits yet" % p)
return p
logging.debug("%s exists, checking for update" % p)
if self._verify_archive(p):
return p
# another comment with the exact same second? wy not.
names = [x for x in os.listdir(glob.COMMENTS) if x.startswith(archive)]
suffixes = [x.replace(archive, '').replace(ext, '').replace('.','') for x in names]
indexes = [int(x) for x in suffixes if x and set(x) <= set('0123456789')]
idx = 1
if indexes:
idx += sorted(indexes)[-1]
return os.path.join(glob.COMMENTS, "%s.%d%s" % (archive, idx, ext))
def _log(self):
if not os.path.isdir(glob.LOGDIR):
os.mkdir (glob.LOGDIR)
logfile = os.path.join(glob.LOGDIR, datetime.datetime.now().strftime("%Y-%m"))
s = json.dumps({
'time': self.time,
'source': self.source,
'target': self.target
})
with open(logfile, "a") as log:
logging.debug( "writing logfile %s with %s" % (logfile, s))
log.write("%s\n" % (s))
log.close()
class TimeSeriesHandler(object):
def __init__(self, tag):
if not os.path.isdir(glob.TSDBDIR):
os.mkdir(glob.TSDBDIR)
self.tag = tag
self.p = os.path.join(glob.TSDBDIR, '%s.csv' % (self.tag))
self.db = {}
#def _loaddb(self):
#if not os.path.isfile(self.p):
#return
#pattern = re.compile(r'^([0-9-\+:T]+)\s+(.*)$')
#searchfile = open(self.p, 'r')
#for line in searchfile:
#matched = re.match(pattern, line)
#if not matched:
#continue
#epoch = int(iso8601.parse_date(matched.group(1)).replace(tzinfo=pytz.utc).strftime('%s'))
#data = matched.group(2)
#self.db[epoch] = data
#searchfile.close()
#def _dumpdb(self):
#lines = []
#for e in self.db.items():
#epoch, data = e
#tstamp = datetime.datetime.utcfromtimestamp(epoch).replace(tzinfo=pytz.utc).strftime(glob.ISODATE)
#line = '%s %s' % (tstamp, data)
#lines.append(line)
#bkp = '%s.bkp' % (self.p)
#shutil.copy(self.p, bkp)
#with open(self.p, "w") as searchfile:
#searchfile.write()
#del(cr)
#csvfile.close()
#os.unlink(bkp)
@staticmethod
def _common_date_base(d1, d2):
d1 = d1.replace(tzinfo=pytz.utc).strftime(glob.ISODATE)
d2 = d2.replace(tzinfo=pytz.utc).strftime(glob.ISODATE)
l = len(d1)
common = ''
for i in range(l):
if d1[i] == d2[i]:
common = common + d1[i]
else:
break
return common
def search(self, when, tolerance=1800):
when = when.replace(tzinfo=pytz.utc)
tolerance = int(tolerance/2)
minwhen = when - datetime.timedelta(seconds=tolerance)
maxwhen = when + datetime.timedelta(seconds=tolerance)
closest = None
mindiff = float('inf')
common = TimeSeriesHandler._common_date_base(minwhen, maxwhen)
pattern = re.compile(r'^(%s[0-9-\+:T]+)\s+(.*)$' % (common))
searchfile = open(self.p, 'r')
for line in searchfile:
matched = re.match(pattern, line)
if not matched:
continue
d = iso8601.parse_date(matched.group(1))
diff = d - when
diff = abs(diff.total_seconds())
if diff >= mindiff:
continue
mindiff = diff
closest = (d, matched.group(2))
searchfile.close()
return closest
def append(self, data, dt=datetime.datetime.now().replace(tzinfo=pytz.utc)):
if os.path.isfile(self.p):
epoch = int(dt.strftime('%s'))
stat = os.stat(self.p)
if epoch < stat.st_mtime:
logging.warning('Refusing to append %s with old data' % self.p)
return
with open(self.p, 'a') as db:
db.write("%s %s\n" % (
dt.strftime(glob.ISODATE),
data
))
class DataHandler(object):
def __init__(self, request):
self.request = request
self.dt = datetime.datetime.now().replace(tzinfo=pytz.utc)
self.response = sanic.response.text('accepted',status=200)
if not 'secrets' in glob.conf or \
not 'devices' in glob.conf['secrets']:
self.response = sanic.response.text(
'server configuration error',
status=501
)
return
if 'id' not in self.request.args:
self.response = sanic.response.text(
'device id not found in request',
status=401
)
return
id = self.request.args.get('id')
if id not in glob.conf['secrets']['devices'].keys():
self.response = sanic.response.text(
'device id rejected',
status=401
)
return
self.id = glob.conf['secrets']['devices'][id]
class OpenGTSHandler(DataHandler):
def __init__(self, *args, **kwargs):
super(OpenGTSHandler, self).__init__(*args, **kwargs)
self.lat = 0
self.lon = 0
self.alt = 0
self._parse()
self.l = '%s 0' % (self.dt.strftime(glob.ISODATE))
def _parse(self):
logging.debug('--- incoming location request ---')
logging.debug(self.request.args)
if 'latitude' in self.request.args and 'longitude' in self.request.args:
self.lat = float(self.request.args.get('latitude'))
self.lon = float(self.request.args.get('longitude'))
elif 'gprmc' in self.request.args:
gprmc = pynmea2.parse(self.request.args.get('gprmc'))
try:
self.lat = float(gprmc.latitude)
self.lon = float(gprmc.longitude)
except:
self.response = sanic.response.text(
"could not process gprmc string",
status=422
)
return
else:
self.response = sanic.response.text(
"no location information found in query",
status=401
)
return
if 'exclude_coordinates' in glob.conf['secrets']:
excl = {}
for t in ['lat', 'lon']:
excl[t] = []
if t in glob.conf['secrets']['exclude_coordinates']:
for c in glob.conf['secrets']['exclude_coordinates'][t]:
excl[t].append(float(c))
if round(self.lat,2) in excl['lat'] and round(self.lon,2) in excl['lon']:
self.response = sanic.response.text(
"this location is on the excluded list",
status=200
)
return
if 'loc_timestamp' in self.request.args and 'offset' in self.request.args:
# this is a bit ugly: first convert the epoch to datetime
# then append it with the offset as string
# and convert the string back to datetime from the iso8601 string
dt = datetime.datetime.utcfromtimestamp(int(self.request.args.get('loc_timestamp')))
dt = dt.strftime('%Y-%m-%dT%H:%M:%S')
dt = "%s%s" % (dt, self.request.args.get('offset'))
try:
self.dt = iso8601.parse_date(dt).replace(tzinfo=pytz.utc)
except:
pass
if 'altitude' in self.request.args:
self.alt = float(self.request.args.get('altitude'))
else:
try:
self.alt = OpenGTSHandler.altitude_from_bing(self.lat, self.lon)
except:
pass
self.lat = "{:4.6f}".format(float(self.lat))
self.lon = "{:4.6f}".format(float(self.lon))
self.alt = "{:4.6f}".format(float(self.alt))
l = '%s %s %s' % (self.lat, self.lon, self.alt)
gpsfile = TimeSeriesHandler('location')
gpsfile.append(l, dt=self.dt)
@staticmethod
def altitude_from_bing(lat, lon):
if 'bing_key' not in glob.conf['secrets']:
return 0
if not glob.conf['secrets']['bing_key']:
return 0
url = "http://dev.virtualearth.net/REST/v1/Elevation/List?points=%s,%s&key=%s" % (
lat,
lon,
glob.conf['secrets']['bing_key']
)
bing = requests.get(url)
bing = json.loads(bing.text)
if 'resourceSets' not in bing or \
'resources' not in bing['resourceSets'][0] or \
'elevations' not in bing['resourceSets'][0]['resources'][0] or \
not bing['resourceSets'][0]['resources'][0]['elevations']:
return 0
alt = float(bing['resourceSets'][0]['resources'][0]['elevations'][0])
del(bing)
del(url)
return alt
class SensorHandler(DataHandler):
def __init__(self, *args, **kwargs):
super(SensorHandler, self).__init__(*args, **kwargs)
self.data = 0
self.tag = ''
self._parse()
def _parse(self):
logging.debug('--- incoming sensor request ---')
logging.debug(self.request.args)
for tag in self.request.args:
if tag == 'id':
continue
datafile = TimeSeriesHandler('%s-%s' % (self.id, tag))
datafile.append(self.request.args.get(tag), dt=self.dt)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
app = Sanic()
@app.route("/webmention")
async def wm(request, methods=["POST"]):
source = request.form.get('source')
target = request.form.get('target')
r = WebmentionHandler(source, target)
return r.response
@app.route("/search")
async def search(request, methods=["GET"]):
query = request.args.get('s')
r = SearchHandler(query)
return r.response
@app.route("/micropub")
async def mpub(request, methods=["POST","GET"]):
r = MicropubHandler(request)
return r.response
@app.route("/opengts")
async def opengts(request, methods=["GET"]):
r = OpenGTSHandler(request)
return r.response
@app.route("/sensor")
async def sensor(request, methods=["GET"]):
r = SensorHandler(request)
return r.response
if __name__ == "__main__":
app.run(host="127.0.0.1", port=8000, debug=True)

916
singular.py Normal file
View file

@ -0,0 +1,916 @@
import os
import re
import sys
import collections
import logging
import glob
import img
import pypandoc
import langdetect
from cache import Cached
from slugify import slugify
from ruamel import yaml
from bs4 import BeautifulSoup
import frontmatter
from webmentiondb import WebmentionDB
import arrow
import json
import socket
import requests
import hashlib
import shutil
class SingularHandler(object):
def __init__(self, fpath, pingdb=WebmentionDB(), category='note'):
self.fpath = os.path.abspath(fpath)
path, fname = os.path.split(self.fpath)
fname, ext = os.path.splitext(fname)
self.fname = fname
self.fext = ext
self.ftime = os.stat(self.fpath)
self.target = os.path.join(glob.TARGET, "%s.html" % (self.fname))
basedir = os.path.join(glob.TARGET, "%s" % (self.fname))
if not os.path.isdir(basedir):
os.mkdir(basedir)
self.saved = os.path.join(glob.TARGET, "%s" % (self.fname), "saved.html")
self.pingdb = pingdb
self.title = ''
self.content = ''
self._content = ''
self.summary = ''
self.html = ''
self.sumhtml = ''
self.category = category
self.tags = []
self.reactions = {}
#self.date = datetime.datetime(1970, 1, 1).replace(tzinfo=pytz.utc)
self.date = arrow.get(0)
self.updated = None
self.dtime = 0
self.utime = 0
self.redirect = {}
self.exifmin = {}
self.lang = glob.conf['site']['lang']
self.syndicate = {}
self.syndications = []
self.template = 'singular.html'
self.slug = slugify(self.fname, only_ascii=True, lower=True)
self.shortslug = slugify(self.fname, only_ascii=True, lower=True)
self.img = None
self.srcset = ''
def __repr__(self):
return "Post '%s' (%s), category: %s" % (self.title,self.fname,self.category)
def _postsetup(self):
""" Shared post-setup - the initial thing, such at title, should be
set by the classes inheriting this one; these are only the common,
shared variables """
# set published epoch
#self.dtime = calendar.timegm(self.date.timetuple())
self.dtime = self.date.timestamp
# set updated epoch, if any and set the original file date according
# to either the updated or the published time
if self.updated:
#self.utime = calendar.timegm(self.updated.timetuple())
self.utime = self.updated.timestamp
if self.utime > 0 and self.utime != self.ftime.st_mtime:
os.utime(self.fpath, (self.utime, self.utime))
elif self.dtime > 0 and self.dtime != self.ftime.st_mtime:
os.utime(self.fpath, (self.dtime, self.dtime))
# generate shortslug from dtime if possible
if self.dtime > 0:
self.shortslug = SingularHandler.baseN(self.dtime)
self.redirect[self.shortslug] = 1
# detect post content language if possible
try:
self.lang = langdetect.detect("%s\n\n%s" % (self.title, self.content))
except:
pass
# make HTML from markdown via pandoc for the content and the summary
self.html = SingularHandler.pandoc_md2html(
self.content,
time=self.ftime
)
self.sumhtml = SingularHandler.pandoc_md2html(
self.summary,
time=self.ftime
)
self.url = "%s/%s" % (glob.conf['site']['url'], self.slug)
self.syndications = self.pingdb.posses(self.url)
#def urlsvg(self):
# import pyqrcode
# import tempfile
## generate qr code to the url
#qrname = tempfile.NamedTemporaryFile(prefix='pyqr_')
#qr = pyqrcode.create(self.url, error='L')
#qr.svg(
#qrname.name,
#xmldecl=False,
#omithw=True,
#scale=1,
#quiet_zone=0,
#svgclass='qr',
#lineclass='qrline'
#)
#with open(qrname.name) as f:
#qrsvg = f.read()
#f.close()
#return qrsvg
@staticmethod
def pandoc_md2html(t, time=None):
if len(t) == 0:
return t
cached = Cached(text="%s" % t, stime=time)
c = cached.get()
if c:
return c
else:
extras = [
'backtick_code_blocks',
'auto_identifiers',
'fenced_code_attributes',
'definition_lists',
'grid_tables',
'pipe_tables',
'strikeout',
'superscript',
'subscript',
'markdown_in_html_blocks',
'shortcut_reference_links',
'autolink_bare_uris',
'raw_html',
'link_attributes',
'header_attributes',
'footnotes',
]
md = "markdown+" + "+".join(extras)
t = pypandoc.convert_text(t, to='html5', format=md)
cached.set(t)
return t
@staticmethod
def pandoc_html2md(t, time=None):
if len(t) == 0:
return t
cached = Cached(text="%s" % t, stime=time)
c = cached.get()
if c:
return c
else:
t = pypandoc.convert_text(
t,
to="markdown-" + "-".join([
'raw_html',
'native_divs',
'native_spans',
]),
format='html'
)
cached.set(t)
return t
def tmpl(self):
return {
'title': self.title,
'published': self.date,
'tags': self.tags,
'author': glob.conf['author'],
'content': self.content,
'html': self.html,
'category': self.category,
'reactions': self.reactions,
'updated': self.updated,
'summary': self.sumhtml,
'exif': self.exifmin,
'lang': self.lang,
'syndicate': self.syndicate,
'slug': self.slug,
'shortslug': self.shortslug,
'srcset': self.srcset,
}
@staticmethod
def write_redirect(sslug, target, tstamp=arrow.utcnow().timestamp):
tmpl = glob.jinja2env.get_template('redirect.html')
jvars = {
'url': target
}
r = tmpl.render(jvars)
# this is to support / ending urls even for the redirects
dirs = [
os.path.join(glob.TARGET, sslug)
]
for d in dirs:
if not os.path.exists(d):
os.mkdir(d)
files = [
os.path.join(glob.TARGET, "%s.html" % (sslug)),
os.path.join(glob.TARGET, sslug, "index.html")
]
for f in files:
if os.path.isfile(f):
rtime = os.stat(f)
if tstamp == rtime.st_mtime:
logging.debug(
"Unchanged dates on redirect file %s", f
)
continue
with open(f, "w") as html:
logging.info("writing redirect file %s", f)
html.write(r)
html.close()
os.utime(f, (tstamp,tstamp))
def redirects(self):
""" Write redirect HTMLs """
if self.category == 'page':
return
for sslug in self.redirect.keys():
SingularHandler.write_redirect(sslug, self.url, self.ftime.st_mtime)
def write(self):
""" Write HTML file """
if os.path.isfile(self.target):
ttime = os.stat(self.target)
if self.ftime.st_mtime == ttime.st_mtime and not glob.FORCEWRITE:
logging.debug(
"Unchanged dates on %s; skipping rendering and writing",
self.fname
)
return
tmpl = glob.jinja2env.get_template(self.template)
logging.info("rendering %s", self.fname)
tmplvars = {
'post': self.tmpl(),
'site': glob.conf['site'],
'taxonomy': {},
}
r = tmpl.render(tmplvars)
soup = BeautifulSoup(r,"html5lib")
r = soup.prettify()
targets = [self.target]
for target in targets:
with open(target, "w") as html:
logging.info("writing %s", target)
html.write(r)
html.close()
os.utime(target, (self.ftime.st_mtime, self.ftime.st_mtime))
rdir = os.path.join(glob.TARGET, self.slug)
if not os.path.isdir(rdir):
os.mkdir(rdir)
altdst = os.path.join(glob.TARGET, self.slug, 'index.html')
altsrc = os.path.join('..', self.target)
if not os.path.islink(altdst):
if os.path.isfile(altdst):
os.unlink(altdst)
os.symlink(altsrc, altdst)
#links = []
#for r in self.reactions.items():
#reactiontype, urls = r
#if isinstance(urls, str):
#links.append(urls)
#elif isinstance(urls, list):
#links = [*links, *urls]
#if 1 == len(links):
#saved = os.path.join(glob.TARGET, self.slug, 'saved.html')
#if not os.path.isfile(saved):
#h, p = _localcopy_hashpath(links[0])
#c = self._get_localcopy(links[0], h, p)
#with open(saved, 'w') as f:
#f.write(c)
#f.close()
def index(self, ix):
""" Write search index """
writer = ix.writer()
c = "%s %s %s %s %s" % (
self.slug,
self.summary,
self._content,
yaml.dump(self.reactions, Dumper=yaml.RoundTripDumper),
yaml.dump(self.exifmin, Dumper=yaml.RoundTripDumper)
)
c = "%s %s" % (c, self._localcopy_include())
if self.img:
imgstr = self.img.mksrcset(generate_caption=False)
else:
imgstr = ''
writer.add_document(
title=self.title,
url=self.url,
content=c,
date=self.date.datetime,
tags=",".join(self.tags),
weight=1,
img=imgstr
)
writer.commit()
def pings(self):
""" Ping (webmention) all URLs found in the post """
links = []
urlregex = re.compile(
r'\s+https?\:\/\/?[a-zA-Z0-9\.\/\?\:@\-_=#]+'
r'\.[a-zA-Z0-9\.\/\?\:@\-_=#]*'
)
matches = re.findall(urlregex, self.content)
for r in self.reactions.items():
reactiontype, urls = r
if isinstance(urls, str):
matches.append(urls)
elif isinstance(urls, list):
matches = [*matches, *urls]
#for s in self.syndicate.keys():
#matches.append('https://brid.gy/publish/%s' % (s))
if self.utime and self.utime > 0:
time = self.utime
else:
time = self.dtime
if len(matches) > 0:
for link in matches:
if glob.conf['site']['domain'] in link:
continue
if link in links:
continue
#self._localcopy(link)
self.pingdb.ping(self.url, link, time)
links.append(link)
def _localcopy_hashpath(self,url):
h = hashlib.md5(url.encode('utf-8')).hexdigest()
p = os.path.join(glob.LOCALCOPIES, "%s.html" % (h))
return (h, p)
def _localcopy_include(self):
links = []
md = ''
for r in self.reactions.items():
reactiontype, urls = r
if isinstance(urls, str):
links.append(urls)
elif isinstance(urls, list):
links = [*links, *urls]
for url in links:
h, p = self._localcopy_hashpath(url)
html = self._get_localcopy(url, h, p)
md = "%s %s" % (
md,
SingularHandler.pandoc_html2md(html, os.stat(p))
)
return md
def _get_localcopy(self, url, h, p):
html = ''
if os.path.isfile(p):
with open(p, 'r') as f:
html = f.read()
f.close()
else:
html = self._make_localcopy(url, h, p)
return html
def _make_localcopy(self, url, h, p):
post = self._pull_localcopy(url)
tmpl = glob.jinja2env.get_template('localcopy.html')
html = tmpl.render({'post': post})
soup = BeautifulSoup(html,"html5lib")
html = soup.prettify()
with open(p, "w") as f:
logging.info("saving readable copy of %s to %s", url, p)
f.write(html)
f.close()
return html
def _pull_localcopy(self, url):
# find the true URL
# MAYBE: add fallback to archive.org?
realurl = url
try:
pretest = requests.head(url, allow_redirects=True, timeout=30)
realurl = pretest.url
except:
pass
parsed = {
'lang': 'en',
'url': url,
'realurl': realurl,
'html': '',
'title': '',
'excerpt': '',
'byline': '',
}
if 'readable' in glob.conf and \
'port' not in glob.conf['readable'] and \
'host' not in glob.conf['readable']:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socktest = sock.connect_ex((
glob.conf['readable']['host'], int(glob.conf['readable']['port'])
))
if 0 == socktest:
text = self._localcopy_via_proxy(realurl)
parsed['html'] = text.get('content','')
parsed['title'] = text.get('title',url)
parsed['excerpt'] = text.get('excerpt', '')
parsed['byline'] = text.get('byline', '')
try:
parsed['lang'] = langdetect.detect(parsed['html'])
except:
pass
return parsed
# TODO: fallback to full-python solution if the previous failed
return parsed
def _localcopy_via_proxy(self, url):
r = "http://%s:%s/api/get?url=%s&sanitize=y" % (
glob.conf['readable']['host'],
glob.conf['readable']['port'],
url
)
try:
req = requests.get(r,allow_redirects=False,timeout=60);
except:
return None
text = {}
try:
text = json.loads(req.text)
except:
pass
return text
def _adaptify(self):
""" Generate srcset for all images possible """
linkto = False
isrepost = None
if len(self.reactions.keys()):
isrepost = list(self.reactions.keys())[0]
if isrepost:
if len(self.reactions[isrepost]) == 1:
linkto = self.reactions[isrepost][0]
mdmatch = re.compile(
r'!\[.*\]\(.*?\.(?:jpe?g|png|gif)'
r'(?:\s+[\'\"]?.*?[\'\"]?)?\)(?:\{.*?\})?'
)
mdsplit = re.compile(
r'!\[(.*)\]\((?:\/(?:files|cache)'
r'(?:\/[0-9]{4}\/[0-9]{2})?\/(.*\.(?:jpe?g|png|gif)))'
r'(?:\s+[\'\"]?(.*?)[\'\"]?)?\)(?:\{(.*?)\})?'
)
mdimg = re.findall(mdmatch, self.content)
for i in mdimg:
m = re.match(mdsplit, i)
if m:
#logging.info(m.groups())
imgpath = os.path.join(glob.SFILES, m.group(2))
if not os.path.isfile(imgpath):
for c in glob.conf['category'].items():
catn, catd = c
catp = os.path.abspath(os.path.join(glob.CONTENT, catn))
if not os.path.exists(catp) \
or not 'type' in catd \
or catd['type'] != 'photo':
continue
imgpath = os.path.join(catp, m.group(2))
break
if os.path.isfile(imgpath):
t = ''
if m.group(3):
t = m.group(3)
cl = ''
if m.group(4):
cl = m.group(4)
a = ''
if m.group(1):
a = m.group(1)
im = img.ImageHandler(
imgpath,
alttext=a,
title=t,
imgcl=cl,
linkto=linkto
)
im.downsize()
logging.debug("replacing image %s with srcset", imgpath)
srcset = im.mksrcset()
if srcset:
self.content = self.content.replace(i, srcset)
del(im)
else:
logging.error("%s missing %s", m.group(2), self.fpath)
def _video(self):
""" [video] shortcode extractor """
match = re.compile(r'\[video mp4=\"/(?:files|cache).*?\"\]\[/video\]')
split = re.compile(r'\[video mp4=\"(/(?:files|cache)\/(.*?))\"\]\[/video\]')
videos = re.findall(match, self.content)
for vid in videos:
v = re.match(split, vid)
video = """
<video controls>
<source src="%s" type="video/mp4">
Your browser does not support the video tag.
</video>""" % (v.group(1))
self.content = self.content.replace(vid, video)
#def _files(self):
#""" Copy misc files referenced """
#match = re.compile(
#r'\s(?:%s)?/(?:files|cache)'
#r'/.*\.(?:(?!jpe?g|png|gif).*)\s' % (glob.conf['site']['domain'])
#)
#split = re.compile(
#r'\s(?:%s)?/((?:files|cache)'
#r'/(.*\.(?:(?!jpe?g|png|gif).*)))\s' % (glob.conf['site']['domain'])
#)
##files = re.findall(match, self.content)
##print(files)
def _snippets(self):
""" Replaces [git:(repo)/(file.ext)] with corresponding code snippet """
snmatch = re.compile(r'\[git:[^\/]+\/(?:.*\..*)\]')
snsplit = re.compile(r'\[git:([^\/]+)\/((?:.*)\.(.*))\]')
snippets = re.findall(snmatch, self.content)
isconf = re.compile(r'conf', re.IGNORECASE)
for snippet in snippets:
sn = re.match(snsplit, snippet)
if sn:
fpath = os.path.join(glob.SOURCE, sn.group(1), sn.group(2))
if not os.path.isfile(fpath):
logging.error(
"missing blogsnippet in %s: %s",
self.fpath,
fpath
)
continue
if re.match(isconf, sn.group(3)):
lang = 'apache'
else:
lang = sn.group(3)
with open(fpath, "r") as snip:
c = snip.read()
snip.close
c = "\n\n```%s\n%s\n```\n" % (lang, c)
logging.debug("replacing blogsnippet %s", fpath)
self.content = self.content.replace(snippet, c)
@staticmethod
def baseN(num, b=36, numerals="0123456789abcdefghijklmnopqrstuvwxyz"):
""" Used to create short, lowecase slug for a number (an epoch) passed """
num = int(num)
return ((num == 0) and numerals[0]) or (
SingularHandler.baseN(
num // b,
b,
numerals
).lstrip(numerals[0]) + numerals[num % b]
)
class ArticleHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(ArticleHandler, self).__init__(*args, **kwargs)
self.dctype = 'Text'
self._setup()
def _setup(self):
post = frontmatter.load(self.fpath)
self.meta = post.metadata
self.content = post.content
self._content = '%s' % (self.content)
if 'tags' in post.metadata:
self.tags = post.metadata['tags']
if 'title' in post.metadata:
self.title = post.metadata['title']
if 'published' in post.metadata:
self.date = arrow.get(post.metadata['published'])
if 'updated' in post.metadata:
self.updated = arrow.get(post.metadata['updated'])
if 'summary' in post.metadata:
self.summary = post.metadata['summary']
if 'redirect' in post.metadata and \
isinstance(post.metadata['redirect'], list):
for r in post.metadata['redirect']:
self.redirect[r] = 1
if 'syndicate' in post.metadata:
z = post.metadata['syndicate']
if isinstance(z, str):
self.syndicate[z] = ''
elif isinstance(z, dict):
for s, c in z.items():
self.syndicate[s] = c
elif isinstance(z, list):
for s in z:
self.syndicate[s] = ''
self.reactions = {}
# getting rid of '-' to avoid css trouble and similar
rmap = {
'bookmark-of': 'bookmark',
'repost-of': 'repost',
'in-reply-to': 'reply',
}
for x in rmap.items():
key, replace = x
if key in self.meta:
if isinstance(self.meta[key], str):
self.reactions[replace] = [self.meta[key]]
elif isinstance(self.meta[key], list):
self.reactions[replace] = self.meta[key]
self._adaptify()
self._snippets()
self._video()
#self._files()
super(ArticleHandler, self)._postsetup()
class PhotoHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(PhotoHandler, self).__init__(*args, **kwargs)
self.dctype = 'Image'
self.img = img.ImageHandler(self.fpath)
self.exif = self.img.exif
self._setup()
def _setup(self):
self.syndicate = {
'flickr': '',
}
keywords = [
'XMP:Keywords',
'IPTC:Keywords'
]
tags = {}
for key in keywords:
if key in self.exif and self.exif[key]:
if isinstance(self.exif[key], str):
self.exif[key] = self.exif[key].split(",")
if isinstance(self.exif[key], list):
for tag in self.exif[key]:
tags[str(tag).strip()] = 1
self.tags = list(tags.keys())
# content
keywords = [
'XMP:Description',
'IPTC:Caption-Abstract'
]
for key in keywords:
if key in self.exif and self.exif[key]:
self.content = self.exif[key]
break
self._content = '%s' % (self.content)
# title
keywords = [
'XMP:Title',
'XMP:Headline',
'IPTC:Headline'
]
for key in keywords:
if key in self.exif and self.exif[key]:
self.title = self.exif[key]
break
# datetime
keywords = [
'XMP:DateTimeDigitized',
'XMP:CreateDate',
'EXIF:CreateDate',
'EXIF:ModifyDate'
]
pattern = re.compile(
"(?P<Y>[0-9]{4}):(?P<M>[0-9]{2}):(?P<D>[0-9]{2})\s+"
"(?P<T>[0-9]{2}:[0-9]{2}:[0-9]{2})Z?"
)
for key in keywords:
if key not in self.exif or not self.exif[key]:
continue
date = None
v = pattern.match(self.exif[key]).groupdict()
if not v:
continue
try:
date = arrow.get('%s-%s-%s %s' % (v['Y'], v['M'], v['D'], v['T']))
except:
continue
if date:
self.date = date
logging.debug("date for %s is set to %s from key %s", self.fname, self.date, key)
break
self.img.title = self.title
self.img.alttext = self.content
self.content = self.content + "\n\n" + self.img.mksrcset(generate_caption=False, uphoto=True)
self.img.downsize()
self.srcset = self.img.mksrcset(generate_caption=False, uphoto=False)
super(PhotoHandler, self)._postsetup()
def tmpl(self):
tmpl = super(PhotoHandler, self).tmpl()
tmpl['exif'] = {}
mapping = {
'camera': [
'EXIF:Model'
],
'aperture': [
'EXIF:FNumber',
'Composite:Aperture'
],
'shutter_speed': [
'EXIF:ExposureTime'
],
'focallength': [
'EXIF:FocalLength',
'Composite:FocalLength35efl',
],
'iso': [
'EXIF:ISO'
],
'lens': [
'Composite:LensID',
'MakerNotes:Lens',
'Composite:LensSpec'
]
}
for ekey, candidates in mapping.items():
for candidate in candidates:
if candidate in self.exif:
tmpl['exif'][ekey] = self.exif[candidate]
break
gps = ['Latitude', 'Longitude']
for g in gps:
gk = 'EXIF:GPS%s' % (g)
if gk not in self.exif:
continue
r = 'EXIF:GPS%sRef' % (g)
ref = None
if r in self.exif:
ref = self.exif[r]
tmpl['exif']['geo_%s' % (g.lower())] = self.gps2dec(
self.exif[gk],
ref
)
##tmpl['imgurl'] = ''
#sizes = collections.OrderedDict(reversed(list(self.img.sizes.items())))
#for size, meta in sizes.items():
#if os.path.isfile(meta['path']):
#with Image.open(meta['path']) as im:
#meta['width'], meta['height'] = im.size
#meta['size'] = os.path.getsize(meta['path'])
#tmpl['img'] = meta
#break
tmpl['img'] = self.img.meta
return tmpl
@staticmethod
def gps2dec(exifgps, ref=None):
pattern = re.compile(r"(?P<deg>[0-9.]+)\s+deg\s+(?P<min>[0-9.]+)'\s+(?P<sec>[0-9.]+)\"(?:\s+(?P<dir>[NEWS]))?")
v = pattern.match(exifgps).groupdict()
dd = float(v['deg']) + (((float(v['min']) * 60) + (float(v['sec']))) / 3600)
if ref == 'West' or ref == 'South' or v['dir'] == "S" or v['dir'] == "W":
dd = dd * -1
return round(dd, 6)
class PageHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(PageHandler, self).__init__(*args, **kwargs)
self._setup()
def _setup(self):
with open(self.fpath) as c:
self.content = c.read()
c.close()
self._content = '%s' % (self.content)
self._adaptify()
super(PageHandler, self)._postsetup()
self.template = 'page.html'

253
taxonomy.py Normal file
View file

@ -0,0 +1,253 @@
import math
import logging
import os
import collections
import json
import glob
from slugify import slugify
from bs4 import BeautifulSoup
from pprint import pprint
class TaxonomyHandler(object):
def __init__(self, taxonomy='', name='', description='', exclude=False):
self.taxonomy = taxonomy
self.name = name
self.description = description
self.exclude = exclude
self.slug = slugify(self.name, only_ascii=True, lower=True)
self.posts = collections.OrderedDict()
self.taxp = os.path.join(glob.TARGET, self.taxonomy)
self.simplepath = os.path.join(self.taxp, 'index.html')
self.basep = os.path.join(self.taxp, self.slug)
self.pagedp = os.path.join(self.basep, 'page')
self.indexpath = os.path.join(self.basep, 'index.html')
self.lptime = 0
def __getitem__(self, key):
return self.posts[key]
def __repr__(self):
return 'Taxonomy %s (name: %s, slug: %s) with %i posts' % (
self.taxonomy,
self.name,
self.slug,
len(self.posts)
)
def __next__(self):
try:
r = self.posts.next()
except:
raise StopIteration()
return r
def __iter__(self):
for ix, post in self.posts.items():
yield post
return
def append(self, post):
k = int(post.date.timestamp)
if k in self.posts:
inc = 1
while k in self.posts:
k = int(k+1)
self.posts[k] = post
self.posts = collections.OrderedDict(sorted(self.posts.items(), reverse=True))
def index(self, ix):
""" Write search index """
writer = ix.writer()
t, lp = list(self.posts.items())[0]
writer.add_document(
title=self.name,
url="%s/%s/%s" % (glob.conf['site']['url'], self.taxonomy, self.slug),
content="%s %s" % (self.name, self.slug),
date=lp.date.datetime,
tags=",".join([self.name]),
weight=10
)
writer.commit()
def _test_freshness(self):
t, lp = list(self.posts.items())[0]
self.lptime = lp.ftime.st_mtime
if os.path.isfile(self.indexpath):
p = self.indexpath
elif os.path.isfile(self.simplepath):
p = self.simplepath
else:
return False
itime = os.stat(p)
if itime.st_mtime == self.lptime and not glob.FORCEWRITE:
logging.debug(
'Taxonomy tree is fresh for %s' % (self.name)
)
return True
return False
def _test_dirs(self):
if not os.path.isdir(self.taxp):
os.mkdir(self.taxp)
if not os.path.isdir(self.basep):
os.mkdir(self.basep)
def write_paginated(self):
if self._test_freshness():
return
self._test_dirs()
taxp = os.path.join(glob.TARGET, self.taxonomy)
basep = os.path.join(glob.TARGET, self.taxonomy, self.slug)
if not os.path.isdir(taxp):
os.mkdir(taxp)
if not os.path.isdir(basep):
os.mkdir(basep)
pages = math.ceil(len(self.posts) / glob.conf['perpage'])
page = 1
if len(self.taxonomy) and len(self.slug):
base_url = "/%s/%s/" % (self.taxonomy, self.slug)
else:
base_url = '/'
while page <= pages:
start = int((page-1) * int(glob.conf['perpage']))
end = int(start + int(glob.conf['perpage']))
dorss = False
posttmpls = [self.posts[k].tmpl() for k in list(sorted(
self.posts.keys(), reverse=True))[start:end]]
if page == 1:
tpath = self.indexpath
do_rss = True
# RSS
else:
do_rss = False
if not os.path.isdir(self.pagedp):
os.mkdir(self.pagedp)
tdir = os.path.join(self.pagedp, "%d" % page)
if not os.path.isdir(tdir):
os.mkdir(tdir)
tpath = os.path.join(tdir, "index.html")
tvars = {
'taxonomy': {
'url': base_url,
'name': self.name,
'taxonomy': self.taxonomy,
'description': self.description,
'paged': page,
'total': pages,
'perpage': glob.conf['perpage'],
},
'site': glob.conf['site'],
'posts': posttmpls,
}
tmpl = glob.jinja2env.get_template('archive.html')
logging.info("rendering %s" % (tpath))
with open(tpath, "w") as html:
r = tmpl.render(tvars)
soup = BeautifulSoup(r, "html5lib")
r = soup.prettify()
logging.info("writing %s" % (tpath))
html.write(r)
html.close()
os.utime(tpath, (self.lptime, self.lptime))
if do_rss:
feeddir = os.path.join(self.basep, 'feed')
if not os.path.isdir(feeddir):
os.mkdir(feeddir)
feedpath = os.path.join(feeddir, "index.xml")
tmpl = glob.jinja2env.get_template('rss.html')
logging.info("rendering %s" % (feedpath))
with open(feedpath, "w") as html:
r = tmpl.render(tvars)
logging.info("writing %s" % (feedpath))
html.write(r)
html.close()
os.utime(feedpath, (self.lptime, self.lptime))
page = page+1
def write_simple(self, template='archive.html'):
if self._test_freshness():
return
self._test_dirs()
base_url = "/%s/" % (self.slug)
posttmpls = [self.posts[k].tmpl() for k in list(sorted(
self.posts.keys(), reverse=True))]
tvars = {
'taxonomy': {
'url': base_url,
'name': self.name,
'taxonomy': self.taxonomy,
'description': self.description,
'paged': 0,
'total': 0,
'perpage': glob.conf['perpage'],
},
'site': glob.conf['site'],
'posts': posttmpls,
}
with open(os.path.join(self.simplepath), "w") as html:
html.write(json.dumps(tvars, indent=4, sort_keys=True, default=str))
html.close()
#tmpl = glob.jinja2env.get_template('gallery.html')
#logging.info("rendering %s" % (indexpath))
#with open(indexpath, "w") as html:
#r = tmpl.render(tvars)
#soup = BeautifulSoup(r, "html5lib")
#r = soup.prettify()
#logging.info("writing %s" % (indexpath))
#html.write(r)
#html.close()
#os.utime(indexpath, (lptime, lptime))
def writesitemap(self):
sitemap = "%s/sitemap.txt" % (glob.TARGET)
urls = []
for p in self.posts.items():
t, data = p
urls.append( "%s/%s" % ( glob.conf['site']['url'], data.slug ) )
with open(sitemap, "w") as f:
logging.info("writing %s" % (sitemap))
f.write("\n".join(urls))
f.close()

20
update.sh Executable file
View file

@ -0,0 +1,20 @@
#!/usr/bin/env bash
if [ -f "/tmp/petermolnar.net.generator.lock" ]; then
exit 0;
fi;
lastfile="$(find /home/petermolnar.net/source/ -type f -name *.md -printf '%T+ %p\n' | sort | tail -n1 | awk '{print $2}')";
lastfilemod=$(stat -c %Y "$lastfile");
lastrunfile="/tmp/generator_last_run";
lastrun=0;
if [ -f "$lastrunfile" ]; then
lastrun=$(stat -c %Y "$lastrunfile");
fi;
if [ "$lastrun" -lt "$lastfilemod" ]; then
cd /home/petermolnar.net/src; ../.venv/bin/python3.5 generator.py;
fi;
exit 0;

103
webmentiondb.py Normal file
View file

@ -0,0 +1,103 @@
import os
import hashlib
import logging
import glob
from webmentiontools.send import WebmentionSend
import requests
import json
class WebmentionDB(object):
dbpath = glob.WEBMENTIONDB
def __init__(self):
self.sent = {}
self._loaddb()
def _loaddb(self):
if os.path.isfile(self.dbpath):
logging.info("loading pinged database")
with open(self.dbpath, 'r') as db:
self.sent = json.loads(db.read())
def _dumpdb(self):
with open(self.dbpath, "w") as db:
logging.info("writing pinged database")
db.write(json.dumps(self.sent, indent=4, sort_keys=True))
db.close()
def _refreshdb(self):
self._dumpdb()
self._loaddb()
def __getitem__(self, key):
r = {}
for i in self.sent.items():
h, data = i
if data['source'] == key:
r[data['target']] = {
'time': data['time'],
'response': data['response']
}
return r
def __len__(self):
return len(self.sent)
def posses(self, key):
r = []
for i in self.sent.items():
h, data = i
if data['source'] != key:
continue
if not len(data['response']):
continue
if 'url' not in data['response']:
continue
r.append(data['response']['url'])
return r
def ping(self, source, target, time=0, posse=False):
resp = {}
source = source.strip()
target = target.strip()
h = source + target + "%i" % (int(time))
h = h.encode('utf-8')
h = hashlib.sha1(h).hexdigest()
if h in self.sent.keys():
logging.debug("already pinged: %s" % (target))
return True
logging.debug("pinging: %s" % (target))
wm = WebmentionSend(source, target)
if hasattr(wm, 'response'):
resp = wm.response
# fire and forget archive.org call
try:
verify = requests.get(
'%s%s' % ('https://web.archive.org/save/', target),
allow_redirects=False,
timeout=30,
)
except:
pass
self.sent[h] = {
'source': source,
'target': target,
'time': time,
'response': resp
}
self._refreshdb()