never finished 1.5 version

This commit is contained in:
Peter Molnar 2017-05-23 11:13:35 +01:00
parent f5c599cef9
commit 82db390786
26 changed files with 1783 additions and 3173 deletions

1
.gitignore vendored
View file

@ -101,3 +101,4 @@ ENV/
.mypy_cache/ .mypy_cache/
config.ini config.ini
config.yml config.yml
nasg/config.py

View file

@ -1,56 +0,0 @@
import os
import json
import hashlib
import logging
import glob
class Cached(object):
def __init__(self, hash='', text='', stime=0):
if not os.path.isdir(glob.CACHE):
os.mkdir(glob.CACHE)
if hash:
self._hbase = hash
elif text:
self._hbase = hashlib.sha1(text.encode('utf-8')).hexdigest()
else:
print("No identifier passed for Cached")
raise
self._cpath = os.path.join(glob.CACHE, self._hbase)
self._stime = stime
if os.path.isfile(self._cpath):
self._ctime = os.stat(self._cpath)
else:
self._ctime = None
def get(self):
if not glob.CACHEENABLED:
return None
cached = ''
if os.path.isfile(self._cpath):
if self._stime and self._stime.st_mtime == self._ctime.st_mtime:
logging.debug("Cache exists at %s; using it" % (self._cpath ))
with open(self._cpath, 'r') as c:
cached = c.read()
c.close()
# invalidate old
elif self._stime and self._stime.st_mtime > self._ctime.st_mtime:
logging.debug("invalidating cache at %s" % (self._cpath ))
os.remove(self._cpath)
return cached
def set(self, content):
if not glob.CACHEENABLED:
return None
with open(self._cpath, "w") as c:
logging.debug("writing cache to %s" % (self._cpath ))
c.write(content)
c.close()
if self._stime:
os.utime(self._cpath, (self._stime.st_mtime, self._stime.st_mtime ))

View file

@ -1,293 +0,0 @@
#!/home/petermolnar.net/.venv/bin/python3.5
"""Usage: generator.py [-h] [-f] [-g] [-p] [-d] [-s FILE]
-h --help show this
-f --force force HTML file rendering
-p --pandoc force re-rendering content HTML
-g --regenerate regenerate images
-s --single FILE only (re)generate a single entity
-d --debug set logging level
"""
import os
import shutil
import logging
import atexit
import json
import sys
import tempfile
import glob
from whoosh import index
from docopt import docopt
from ruamel import yaml
from webmentiontools.send import WebmentionSend
import taxonomy
import singular
from slugify import slugify
import arrow
class Engine(object):
lockfile = "/tmp/petermolnar.net.generator.lock"
def __init__(self):
if os.path.isfile(self.lockfile):
raise ValueError("Lockfile %s is present; generator won't run.")
else:
with open(self.lockfile, "w") as lock:
lock.write(arrow.utcnow().format())
lock.close()
atexit.register(self.removelock)
atexit.register(self.removetmp)
self._mkdirs()
self.tags = {}
self.category = {}
self.allposts = None
self.frontposts = None
self.slugsdb = os.path.join(glob.CACHE, "slugs.json")
if os.path.isfile(self.slugsdb):
with open(self.slugsdb) as slugsdb:
self.allslugs = json.loads(slugsdb.read())
slugsdb.close()
else:
self.allslugs = []
self.tmpwhoosh = tempfile.mkdtemp('whooshdb_', dir=tempfile.gettempdir())
self.whoosh = index.create_in(self.tmpwhoosh, glob.schema)
def removelock(self):
os.unlink(self.lockfile)
def removetmp(self):
if os.path.isdir(self.tmpwhoosh):
for root, dirs, files in os.walk(self.tmpwhoosh, topdown=False):
for f in files:
os.remove(os.path.join(root, f))
for d in dirs:
os.rmdir(os.path.join(root, d))
def initbuilder(self):
self._copy_and_compile()
def cleanup(self):
with open(os.path.join(glob.CACHE, "slugs.json"), "w") as db:
logging.info("updating slugs database")
db.write(json.dumps(self.allslugs))
db.close()
tags = []
for tslug, taxonomy in self.tags.items():
tags.append(taxonomy.name)
with open(os.path.join(glob.CACHE, "tags.json"), "w") as db:
logging.info("updating tags database")
db.write(json.dumps(tags))
db.close()
logging.info("deleting old searchdb")
shutil.rmtree(glob.SEARCHDB)
logging.info("moving new searchdb")
shutil.move(self.tmpwhoosh, glob.SEARCHDB)
def _mkdirs(self):
for d in [glob.TARGET, glob.TFILES, glob.TTHEME, glob.CACHE]:
if not os.path.isdir(d):
os.mkdir(d)
def _copy_and_compile(self):
for f in os.listdir(glob.STHEME):
p = os.path.join(glob.STHEME, f)
if os.path.isdir(p):
try:
shutil.copytree(p, os.path.join(glob.TTHEME, f))
except FileExistsError:
pass
else:
path, fname = os.path.split(p)
fname, ext = os.path.splitext(fname)
logging.debug("copying %s", p)
shutil.copy(p, os.path.join(glob.TTHEME, f))
@staticmethod
def postbycategory(fpath, catd=None, catn=None):
if catd == 'photo':
post = singular.PhotoHandler(fpath, category=catn)
elif catd == 'page':
post = singular.PageHandler(fpath)
else:
post = singular.ArticleHandler(fpath, category=catn)
return post
def collect(self):
self.allposts = taxonomy.TaxonomyHandler()
#self.gallery = taxonomy.TaxonomyHandler(taxonomy="photography", name="Photography")
self.frontposts = taxonomy.TaxonomyHandler()
for category in glob.conf['category'].items():
catn, catd = category
catp = os.path.abspath(os.path.join(glob.CONTENT, catn))
if not os.path.exists(catp):
continue
logging.debug("getting posts for category %s from %s", catn, catp)
cat = taxonomy.TaxonomyHandler(taxonomy='category', name=catn)
self.category[catn] = cat
for f in os.listdir(catp):
fpath = os.path.join(catp, f)
if not os.path.isfile(fpath):
continue
logging.debug("parsing %s", fpath)
exclude = False
if 'exclude' in catd:
exclude = bool(catd['exclude'])
ct = None
if 'type' in catd:
ct = catd['type']
post = Engine.postbycategory(fpath, catd=ct, catn=catn)
self.allposts.append(post)
if post.dtime > arrow.utcnow().timestamp:
logging.warning(
"Post '%s' will be posted in the future; "
"skipping it from Taxonomies for now", fpath
)
else:
cat.append(post)
if not exclude:
self.frontposts.append(post)
if hasattr(post, 'tags') and isinstance(post.tags, list):
for tag in post.tags:
tslug = slugify(tag, only_ascii=True, lower=True)
if not tslug in self.tags.keys():
t = taxonomy.TaxonomyHandler(taxonomy='tag', name=tag)
self.tags[tslug] = t
else:
t = self.tags[tslug]
t.append(post)
elif not hasattr(post, 'tags'):
logging.error("%s post does not have tags", post.fname)
elif not isinstance(post.tags, list):
logging.error(
"%s tags are not a list, it's %s ",
post.fname,
type(post.tags)
)
for r in post.redirect.keys():
self.allslugs.append(r)
self.allslugs.append(post.fname)
def renderposts(self):
for p in self.allposts.posts.items():
time, post = p
post.write()
post.redirects()
post.pings()
post.index(self.whoosh)
def rendertaxonomies(self):
for t in [self.tags, self.category]:
for tname, tax in t.items():
if glob.conf['category'].get(tname, False):
if glob.conf['category'][tname].get('nocollection', False):
logging.info("skipping taxonomy '%s' due to config nocollections", tname)
continue
tax.write_paginated()
tax.index(self.whoosh)
self.frontposts.write_paginated()
#self.gallery.write_simple(template='gallery.html')
self.allposts.writesitemap()
def globredirects(self):
redirects = os.path.join(glob.CONTENT,'redirects.yml')
if not os.path.isfile(redirects):
return
ftime = os.stat(redirects)
rdb = {}
with open(redirects, 'r') as db:
rdb = yaml.safe_load(db)
db.close()
for r_ in rdb.items():
target, slugs = r_
for slug in slugs:
singular.SingularHandler.write_redirect(
slug,
"%s/%s" % (glob.conf['site']['url'], target),
ftime.st_mtime
)
def recordlastrun(self):
if os.path.exists(glob.lastrun):
t = arrow.utcnow().timestamp
os.utime(glob.lastrun, (t,t))
else:
open(glob.lastrun, 'a').close()
if __name__ == '__main__':
args = docopt(__doc__, version='generator.py 0.2')
if args['--pandoc']:
glob.CACHEENABLED = False
if args['--force']:
glob.FORCEWRITE = True
if args['--regenerate']:
glob.REGENERATE = True
logform = '%(asctime)s - %(levelname)s - %(message)s'
if args['--debug']:
loglevel = 10
else:
loglevel = 40
while len(logging.root.handlers) > 0:
logging.root.removeHandler(logging.root.handlers[-1])
logging.basicConfig(level=loglevel, format=logform)
if args['--single']:
logging.info("(re)generating a single item only")
path = args['--single'].split('/')
fpath = os.path.join(glob.CONTENT, path[0], path[1])
post = Engine.postbycategory(fpath, catd=path[0])
post.pings()
post.write()
sys.exit(0)
else:
eng = Engine()
eng.initbuilder()
eng.collect()
eng.renderposts()
eng.globredirects()
eng.rendertaxonomies()
eng.recordlastrun()
eng.cleanup()

109
glob.py
View file

@ -1,109 +0,0 @@
import os
import logging
from ruamel import yaml
from whoosh import fields
from whoosh import analysis
import jinja2
from slugify import slugify
import arrow
schema = fields.Schema(
url=fields.ID(
stored=True,
),
title=fields.TEXT(
stored=True,
analyzer=analysis.FancyAnalyzer(
)
),
date=fields.DATETIME(
stored=True,
sortable=True
),
content=fields.TEXT(
stored=True,
analyzer=analysis.FancyAnalyzer(
)
),
tags=fields.TEXT(
stored=True,
analyzer=analysis.KeywordAnalyzer(
lowercase=True,
commas=True
)
),
weight=fields.NUMERIC(
sortable=True
),
img=fields.TEXT(
stored=True
)
)
BASEDIR = os.path.dirname(os.path.abspath(__file__))
CONFIG = os.path.abspath(os.path.join(BASEDIR, 'config.yml'))
with open(CONFIG, 'r') as c:
conf = yaml.safe_load(c)
conf['site']['author'] = conf['author']
c.close()
secrets = os.path.abspath(os.path.join(BASEDIR, 'secret.yml'))
if os.path.isfile(secrets):
with open(secrets, 'r') as c:
conf['secrets'] = yaml.safe_load(c)
c.close()
CACHEENABLED = True
REGENERATE = False
FORCEWRITE = False
ISODATE = '%Y-%m-%dT%H:%M:%S%z'
SOURCE = os.path.abspath(conf['dirs']['source']['root'])
CONTENT = os.path.abspath(conf['dirs']['source']['content'])
FONT = os.path.abspath(conf['dirs']['font'])
STHEME = os.path.abspath(conf['dirs']['source']['theme'])
SFILES = os.path.abspath(conf['dirs']['source']['files'])
TEMPLATES = os.path.abspath(conf['dirs']['source']['templates'])
COMMENTS = os.path.abspath(conf['dirs']['source']['comments'])
TARGET = os.path.abspath(conf['dirs']['target']['root'])
TTHEME = os.path.abspath(conf['dirs']['target']['theme'])
TFILES = os.path.abspath(conf['dirs']['target']['files'])
UFILES = conf['dirs']['target']['furl']
CACHE = os.path.abspath(conf['dirs']['cache'])
SEARCHDB = os.path.abspath(conf['dirs']['searchdb'])
WEBMENTIONDB = os.path.abspath(conf['webmentiondb'])
LOGDIR = os.path.abspath(conf['dirs']['log'])
GPSDIR = os.path.abspath(conf['dirs']['gps'])
TSDBDIR = os.path.abspath(conf['dirs']['tsdb'])
LOCALCOPIES = os.path.abspath(conf['dirs']['localcopies'])
lastrun = '/tmp/generator_last_run'
os.environ.setdefault('PYPANDOC_PANDOC', '/usr/bin/pandoc')
def jinja_filter_date(d, form='%Y-%m-%d %H:%m:%S'):
if d == 'now':
return arrow.now().strftime(form)
if form == 'c':
form = '%Y-%m-%dT%H:%M:%S%z'
return d.strftime(form)
def jinja_filter_slugify(s):
return slugify(s, only_ascii=True, lower=True)
def jinja_filter_search(s, r):
if r in s:
return True
return False
jinjaldr = jinja2.FileSystemLoader(searchpath=TEMPLATES)
jinja2env = jinja2.Environment(loader=jinjaldr)
jinja2env.filters['date'] = jinja_filter_date
jinja2env.filters['search'] = jinja_filter_search
jinja2env.filters['slugify'] = jinja_filter_slugify

370
img.py
View file

@ -1,370 +0,0 @@
import os
import re
import sys
import json
import shutil
import collections
import logging
import imghdr
from ctypes import c_void_p, c_size_t
import glob
import pyexifinfo
from similar_text import similar_text
from cache import Cached
import wand.api
import wand.image
import wand.drawing
import wand.color
from PIL import Image
#from subprocess import call
# https://stackoverflow.com/questions/34617422/how-to-optimize-image-size-using-wand-in-python
wand.api.library.MagickSetCompressionQuality.argtypes = [c_void_p, c_size_t]
class ImageHandler(object):
def __init__(self, fpath, alttext='', title='', imgcl='', linkto=False):
self.fpath = os.path.abspath(fpath)
path, fname = os.path.split(self.fpath)
fname, ext = os.path.splitext(fname)
self.fname = fname
self.fext = ext
self.ftime = os.stat(self.fpath)
self.linkto = linkto
self.alttext = alttext
self.title = title
self.imgcl = imgcl
self.c = os.path.join(glob.TFILES, self.fname)
self.u = "%s/%s/%s" % (glob.conf['site']['url'],glob.UFILES, self.fname)
self.what = imghdr.what(self.fpath)
self.meta = {}
self.exif = {}
if self.what == 'jpeg':
self._setexif()
self.watermark = ''
wfile = os.path.join(glob.SOURCE, glob.conf['watermark'])
if os.path.isfile(wfile):
self.watermark = wfile
sizes = {
90: {
'ext': 's',
'cropped': True,
},
360: {
'ext': 'm',
},
#540: 'n',
720: {
'ext': 'z',
},
#980: 'c',
1280: {
'ext': 'b',
}
}
self.sizes = collections.OrderedDict(sorted(sizes.items(), reverse=0))
for size, meta in self.sizes.items():
meta['path'] = "%s_%s%s" % (self.c, meta['ext'], self.fext)
meta['url'] = "%s_%s%s" % (self.u, meta['ext'], self.fext)
meta['mime'] = "image/%s" % (self.what)
self._setmeta()
self.fallbacksize = 720
self.srcsetmin = 720
self._is_photo()
if self.is_photo:
self.srcset = self.mksrcset(generate_caption=False, uphoto=False)
def _setmeta(self):
s = collections.OrderedDict(reversed(list(self.sizes.items())))
for size, meta in s.items():
if os.path.isfile(meta['path']):
with Image.open(meta['path']) as im:
meta['width'], meta['height'] = im.size
meta['size'] = os.path.getsize(meta['path'])
self.meta = meta
break
def downsize(self, liquidcrop=True, watermark=True):
if not self._is_downsizeable():
return self._copy()
if not self._isneeded():
logging.debug("downsizing not needed for %s", self.fpath)
return
logging.debug("downsizing %s", self.fpath)
try:
img = wand.image.Image(filename=self.fpath)
img.auto_orient()
except:
print("Unexpected error:", sys.exc_info()[0])
raise
# watermark
if self.is_photo and self.watermark and img.format == "JPEG" and watermark:
img = self._watermark(img)
elif self.linkto:
img = self._sourceurlmark(img)
# resize & cache
for size, meta in self.sizes.items():
self._intermediate(img, size, meta)
self._setmeta()
def _setexif(self):
cached = Cached(text=self.fname, stime=self.ftime)
cexif = cached.get()
if cexif:
self.exif = json.loads(cexif)
else:
exif = pyexifinfo.get_json(self.fpath)
self.exif = exif.pop()
cached.set(json.dumps(self.exif))
def _is_photo(self):
self.is_photo = False
if 'cameras' in glob.conf:
if 'EXIF:Model' in self.exif:
if self.exif['EXIF:Model'] in glob.conf['cameras']:
self.is_photo = True
if 'copyright' in glob.conf:
if 'IPTC:CopyrightNotice' in self.exif:
for s in glob.conf['copyright']:
pattern = re.compile(r'%s' % s)
if pattern.search(self.exif['IPTC:CopyrightNotice']):
self.is_photo = True
if self.is_photo:
#self.category = "photo"
if not self.alttext:
keywords = ['XMP:Description', 'IPTC:Caption-Abstract']
for key in keywords:
if key in self.exif and self.exif[key]:
self.alttext = self.exif[key]
break
if not self.title:
keywords = ['XMP:Title', 'XMP:Headline', 'IPTC:Headline']
for key in keywords:
if key in self.exif and self.exif[key]:
self.title = self.exif[key]
break
def _is_downsizeable(self):
if self.what != 'jpeg' and self.what != 'png':
return False
if self.imgcl:
return False
return True
def _watermark(self, img):
wmark = wand.image.Image(filename=self.watermark)
if img.width > img.height:
w = img.width * 0.16
h = wmark.height * (w / wmark.width)
x = img.width - w - (img.width * 0.01)
y = img.height - h - (img.height * 0.01)
else:
w = img.height * 0.16
h = wmark.height * (w / wmark.width)
x = img.width - h - (img.width * 0.01)
y = img.height - w - (img.height * 0.01)
w = round(w)
h = round(h)
x = round(x)
y = round(y)
wmark.resize(w, h)
if img.width < img.height:
wmark.rotate(-90)
img.composite(image=wmark, left=x, top=y)
return img
def _sourceurlmark(self, img):
with wand.drawing.Drawing() as draw:
draw.fill_color = wand.color.Color('#fff')
draw.fill_opacity = 0.8
draw.stroke_color = wand.color.Color('#fff')
draw.stroke_opacity = 0.8
r_h = round(img.height * 0.3)
r_top = round((img.height/2) - (r_h/2))
draw.rectangle(
left=0,
top=r_top,
width=img.width,
height=r_h
)
draw(img)
with wand.drawing.Drawing() as draw:
draw.font = os.path.join(glob.FONT)
draw.font_size = round((img.width)/len(self.linkto)*1.5)
draw.gravity = 'center'
draw.text(
0,
0,
self.linkto
)
draw(img)
return img
def _copy(self):
p = self.c + self.fext
if not os.path.isfile(p):
logging.debug("copying %s" % self.fpath)
shutil.copy(self.fpath, p)
return
def _isneeded(self):
# skip existing
needed = False
if glob.REGENERATE:
needed = True
else:
for size, meta in self.sizes.items():
if not os.path.isfile(meta['path']):
needed = True
return needed
def _intermediate_dimensions(self, img, size, meta):
if (img.width > img.height and 'crop' not in meta) \
or (img.width < img.height and 'crop' in meta):
width = size
height = int(float(size / img.width) * img.height)
else:
height = size
width = int(float(size / img.height) * img.width)
return (width, height)
def _intermediate_symlink(self, meta):
# create a symlink to the largest resize with the full filename;
# this is to ensure backwards compatibility and avoid 404s
altsrc = meta['path']
altdst = self.c + self.fext
if not os.path.islink(altdst):
if os.path.isfile(altdst):
os.unlink(altdst)
os.symlink(altsrc, altdst)
def _intermediate(self, img, size, meta):
# skip existing unless regenerate needed
if os.path.isfile(meta['path']) and not glob.REGENERATE:
return
# too small images: move on
#if size > img.height and size > img.width:
# return
width, height = self._intermediate_dimensions(img, size, meta)
try:
thumb = img.clone()
thumb.resize(width, height)
#thumb.resize(width, height, filter='robidouxsharp')
if 'crop' in meta and liquidcrop:
thumb.liquid_rescale(size, size, 1, 1)
elif 'crop' in meta:
l = t = 0
if width > size:
l = int((width - size) / 2)
if height > size:
t = int((height - size) / 2)
thumb.crop(left=l, top=t, width=size, height=size)
if img.format == "PNG":
library.MagickSetCompressionQuality(img.wand, 75)
if img.format == "JPEG":
thumb.compression_quality = 86
thumb.unsharp_mask(radius=0, sigma=0.5, amount=1, threshold=0.03)
thumb.format = 'pjpeg'
# this is to make sure pjpeg happens
with open(meta['path'], 'wb') as f:
thumb.save(file=f)
if size == list(self.sizes.keys())[-1]:
self._intermediate_symlink(meta)
#if img.format == "JPEG":
## this one strips the embedded little jpg
#call(['/usr/bin/jhead', '-dt', '-q', cpath])
except:
print("Unexpected error:", sys.exc_info()[0])
raise
def mksrcset(self, generate_caption=True, uphoto=False):
if not self._is_downsizeable():
return False
for size, meta in self.sizes.items():
if 'crop' in meta:
continue
# increase fallback until max fallback reached
if size <= self.fallbacksize:
fallback = meta['url']
# set target for the largest
target = meta['url']
if uphoto:
uphotoclass=' u-photo'
else:
uphotoclass=''
caption = ''
if not self.imgcl:
cl = ''
else:
cl = self.imgcl
if self.alttext \
and similar_text(self.alttext, self.fname) < 90 \
and similar_text(self.alttext, self.fname + '.' + self.fext) < 90 \
and generate_caption:
caption = '<figcaption class=\"caption\">%s</figcaption>' % (self.alttext)
if self.linkto:
target = self.linkto
return '<figure class="photo"><a target="_blank" class="adaptive%s" href="%s"><img src="%s" class="adaptimg %s" alt="%s" /></a>%s</figure>' % (uphotoclass, target, fallback, self.imgcl, self.alttext, caption)

203
nasg.py Normal file
View file

@ -0,0 +1,203 @@
import argparse
import logging
import os
import re
import arrow
import atexit
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
from slugify import slugify
import nasg.config as config
import nasg.singular as singular
import nasg.searchindex as searchindex
import nasg.taxonomy as taxonomy
from pprint import pprint
parser = argparse.ArgumentParser(description='Parameters for NASG')
parser.add_argument(
'--regenerate', '-f',
dest='regenerate',
action='store_true',
default=False,
help='force regeneration of all HTML outputs'
)
parser.add_argument(
'--downsize', '-c',
action='store_true',
dest='downsize',
default=False,
help='force re-downsizing of all suitable images'
)
parser.add_argument(
'--debug', '-d',
action='store_true',
dest='debug',
default=False,
help='turn on debug log'
)
class Engine(object):
def __init__(self):
self._initdirs()
self._lock()
atexit.register(self._lock, action='clear')
self.files = []
self.categories = {}
self.tags = {}
self.allposts = taxonomy.TaxonomyHandler('')
self.frontposts = taxonomy.TaxonomyHandler('')
self.allowedpattern = re.compile(config.accept_sourcefiles)
self.counter = {}
def _parse_results(self, futures):
for future in futures:
try:
future.result()
except Exception as e:
logging.error("processing failed: %s", e)
def collect(self):
self._setup_categories()
self._setup_singulars()
def render(self):
self._render_singulars()
#self._render_taxonomy()
def _render_singulars(self):
logging.warning("rendering singulars")
pprint(self.allposts)
#futures = []
#with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
for p in self.allposts:
#futures.append(executor.submit(p.write))
p.write()
#for future in futures:
#try:
#future.result()
#except Exception as e:
#logging.error("processing failed: %s", e)
def _render_taxonomy(self):
futures = []
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
for tslug, t in self.tags.items():
#t.write()
futures.append(executor.submit(t.write))
for cslug, c in self.categories.items():
#c.write()
futures.append(executor.submit(c.write))
#self.frontposts.write()
futures.append(executor.submit(self.frontposts.write))
self._parse_results(futures)
def _setup_categories(self):
for cat, meta in config.categories.items():
cpath = os.path.join(config.CONTENT, cat)
if not os.path.isdir(cpath):
logging.error("category %s not found at: %s", cat, cpath)
continue
self.categories[cat] = taxonomy.TaxonomyHandler(
meta.get('name', cat),
taxonomy=meta.get('type', 'category'),
slug=cat,
render=meta.get('render', True)
)
def _setup_singulars(self):
futures = []
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
for slug, tax in self.categories.items():
cpath = os.path.join(config.CONTENT, slug)
for f in os.listdir(cpath):
fpath = os.path.join(cpath,f)
if not self.allowedpattern.fullmatch(f):
logging.warning("unexpected file at: %s" % fpath)
continue
#self._posttype(fpath, slug)
futures.append(executor.submit(self._posttype, fpath, slug))
self._parse_results(futures)
def _posttype(self, fpath, cat):
c = self.categories[cat]
if re.match('.*\.jpg', fpath):
p = singular.PhotoHandler(fpath)
elif 'page' == c.taxonomy:
p = singular.PageHandler(fpath)
else:
p = singular.ArticleHandler(fpath)
c.append(p)
self.allposts.append(p)
front = config.categories[cat].get('front', True)
if front:
self.frontposts.append(p)
ptags = p.vars.get('tags', [])
for tag in ptags:
tslug = slugify(tag, only_ascii=True, lower=True)
if tslug not in self.tags:
self.tags[tslug] = taxonomy.TaxonomyHandler(
tag,
taxonomy='tag',
slug=tslug
)
self.tags[tslug].append(p)
def _initdirs(self):
for d in [
config.TARGET,
config.TTHEME,
config.TFILES,
config.VAR,
config.SEARCHDB,
config.TSDB,
config.LOGDIR
]:
if not os.path.exists(d):
os.mkdir(d)
def _lock(self, action='set'):
if 'set' == action:
if os.path.exists(config.LOCKFILE):
raise ValueError("lockfile %s present" % config.LOCKFILE)
with open(config.LOCKFILE, "wt") as l:
l.write("%s" % arrow.utcnow())
l.close()
elif 'clear' == action:
if os.path.exists(config.LOCKFILE):
os.unlink(config.LOCKFILE)
else:
return os.path.exists(config.LOCKFILE)
if __name__ == '__main__':
config.options.update(vars(parser.parse_args()))
loglevel = 30
if config.options['debug']:
loglevel = 10
while len(logging.root.handlers) > 0:
logging.root.removeHandler(logging.root.handlers[-1])
logging.basicConfig(
level=loglevel,
format='%(asctime)s - %(levelname)s - %(message)s'
)
engine = Engine()
engine.collect()
engine.render()

0
nasg/__init__.py Normal file
View file

115
nasg/cmdline.py Normal file
View file

@ -0,0 +1,115 @@
import subprocess
import os
import json
import logging
class CommandLine(object):
def __init__(self, cmd, stdin=''):
self.cmd = cmd.split(' ')
self.stdin = stdin
self.stdout = ''
self.binary = None
self._which()
if not self.binary:
raise ValueError('%s binary was not found in PATH' % self.cmd[0])
# based on: http://stackoverflow.com/a/377028/673576
def _which(self):
if self._is_exe(self.cmd[0]):
self.binary = self.cmd[0]
return
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
fpath = os.path.join(path, self.cmd[0])
if self._is_exe(fpath):
self.binary = self.cmd[0] = fpath
return
def _is_exe(self, fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
def run(self):
p = subprocess.Popen(
self.cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=os.environ.copy()
)
stdout, stderr = p.communicate(self.stdin.encode('utf-8'))
self.stdout = stdout.decode('utf-8').strip()
return self
class Exiftool(CommandLine):
def __init__(self, fpath = ''):
self.fpath = fpath
cmd ="/usr/local/bin/exiftool -json -sort -groupNames %s" % (fpath)
super(Exiftool, self).__init__(cmd)
def get(self):
self.run()
exif = {}
try:
exif = json.loads(self.stdout)[0]
except json.JSONDecodeError as e:
logging.error("Error when decoding JSON returned from exiftool: %s" % e)
pass
return exif
class Pandoc(CommandLine):
""" Use: Pandoc.[formatter function].get()
available formatter functions:
- md2html: from markdown extra to html5
- html2md: from html5 to simple markdown
The default is plain markdown to html5 (if no formatter function added)
"""
def __init__(self, text):
self.stdin = text
self.format_in = 'markdown'
self.format_out = 'html5'
self.stdout = ''
def md2html(self):
self.format_in = "markdown+" + "+".join([
'backtick_code_blocks',
'auto_identifiers',
'fenced_code_attributes',
'definition_lists',
'grid_tables',
'pipe_tables',
'strikeout',
'superscript',
'subscript',
'markdown_in_html_blocks',
'shortcut_reference_links',
'autolink_bare_uris',
'raw_html',
'link_attributes',
'header_attributes',
'footnotes',
])
return self
def html2md(self):
self.format_out = "markdown-" + "-".join([
'raw_html',
'native_divs',
'native_spans',
])
return self
def get(self):
cmd = "/usr/bin/pandoc -o- --from=%s --to=%s" % (self.format_in, self.format_out)
super(Pandoc, self).__init__(cmd, stdin=self.stdin)
self.run()
return self.stdout

21
nasg/func.py Normal file
View file

@ -0,0 +1,21 @@
import re
def gps2dec(exifgps, ref=None):
pattern = re.compile(r"(?P<deg>[0-9.]+)\s+deg\s+(?P<min>[0-9.]+)'\s+(?P<sec>[0-9.]+)\"(?:\s+(?P<dir>[NEWS]))?")
v = pattern.match(exifgps).groupdict()
dd = float(v['deg']) + (((float(v['min']) * 60) + (float(v['sec']))) / 3600)
if ref == 'West' or ref == 'South' or v['dir'] == "S" or v['dir'] == "W":
dd = dd * -1
return round(dd, 6)
def baseN(num, b=36, numerals="0123456789abcdefghijklmnopqrstuvwxyz"):
""" Used to create short, lowecase slug for a number (an epoch) passed """
num = int(num)
return ((num == 0) and numerals[0]) or (
baseN(
num // b,
b,
numerals
).lstrip(numerals[0]) + numerals[num % b]
)

297
nasg/img.py Normal file
View file

@ -0,0 +1,297 @@
import os
import re
import shutil
import logging
import imghdr
from similar_text import similar_text
import wand.api
import wand.image
import wand.drawing
import wand.color
import nasg.config as config
from nasg.cmdline import Exiftool
class ImageHandler(object):
sizes = {
90: {
'ext': 's',
'crop': True,
},
360: {
'ext': 'm',
},
720: {
'ext': 'z',
'fallback': True
},
1280: {
'ext': 'b',
}
}
def __init__(self, fpath, alttext='', title='', imgcl='', linkto=False):
logging.info("parsing image: %s" % fpath)
self.fpath = os.path.abspath(fpath)
self.fname, self.ext = os.path.splitext(os.path.basename(fpath))
self.linkto = linkto
self.alttext = alttext
self.title = title
self.imgcl = imgcl
self.what = imghdr.what(self.fpath)
self.mime = "image/%s" % (self.what)
self.exif = {}
self.is_photo = False
if self.what == 'jpeg':
self._setexif()
self._is_photo()
self.is_downsizeable = False
if not self.imgcl:
if self.what == 'jpeg' or self.what == 'png':
self.is_downsizeable = True
self.sizes = sorted(self.sizes.items())
for size, meta in self.sizes:
meta['fname'] = "%s_%s%s" % (
self.fname,
meta['ext'],
self.ext
)
meta['fpath'] = os.path.join(
config.TFILES,
meta['fname']
)
meta['url'] = "%s/%s/%s" % (
config.site['url'],
config.UFILES,
meta['fname']
)
if 'fallback' in meta:
self.fallback = meta['url']
self.targeturl = meta['url']
def featured(self):
# sizes elements are tuples: size, meta
return {
'mime': self.mime,
'url': self.sizes[-1][1]['url'],
'bytes': os.path.getsize(self.sizes[-1][1]['fpath'])
}
def _setexif(self):
self.exif = Exiftool(self.fpath).get()
def _is_photo(self):
model = self.exif.get('EXIF:Model', None)
if hasattr(config, 'cameras') and \
model in config.cameras:
self.is_photo = True
return
cprght = self.exif.get('IPTC:CopyrightNotice', '')
if hasattr(config, 'copyr'):
for s in config.copyr:
pattern = re.compile(r'%s' % s)
if pattern.match(cprght):
self.is_photo = True
return
def _watermark(self, img):
if 'watermark' not in config.options:
return img
if not os.path.isfile(config.options['watermark']):
return img
wmark = wand.image.Image(filename=config.options['watermark'])
if img.width > img.height:
w = img.width * 0.16
h = wmark.height * (w / wmark.width)
x = img.width - w - (img.width * 0.01)
y = img.height - h - (img.height * 0.01)
else:
w = img.height * 0.16
h = wmark.height * (w / wmark.width)
x = img.width - h - (img.width * 0.01)
y = img.height - w - (img.height * 0.01)
w = round(w)
h = round(h)
x = round(x)
y = round(y)
wmark.resize(w, h)
if img.width < img.height:
wmark.rotate(-90)
img.composite(image=wmark, left=x, top=y)
return img
def _sourceurlmark(self, img):
with wand.drawing.Drawing() as draw:
draw.fill_color = wand.color.Color('#fff')
draw.fill_opacity = 0.8
draw.stroke_color = wand.color.Color('#fff')
draw.stroke_opacity = 0.8
r_h = round(img.height * 0.3)
r_top = round((img.height/2) - (r_h/2))
draw.rectangle(
left=0,
top=r_top,
width=img.width,
height=r_h
)
draw(img)
with wand.drawing.Drawing() as draw:
draw.font = config.FONT
draw.font_size = round((img.width)/len(self.linkto)*1.5)
draw.gravity = 'center'
draw.text(
0,
0,
self.linkto
)
draw(img)
return img
def downsize(self):
if not self.is_downsizeable:
return self._copy()
if not self._isneeded():
logging.debug("downsizing not needed for %s", self.fpath)
return
logging.debug("downsizing %s", self.fpath)
try:
img = wand.image.Image(filename=self.fpath)
img.auto_orient()
except ValueError as e:
logging.error("opening %s with wand failed: %s", self.fpath, e)
return
if self.is_photo:
img = self._watermark(img)
elif self.linkto:
img = self._sourceurlmark(img)
for size, meta in self.sizes:
self._intermediate(img, size, meta)
#self._setmeta()
def _copy(self):
target = os.path.join(
config.TFILES,
"%s%s" % (self.fname, self.ext)
)
if os.path.isfile(target) and \
not config.options['downsize']:
return
logging.debug("copying %s to %s", self.fpath, target)
shutil.copy(self.fpath, target)
def _isneeded(self):
if config.options['downsize']:
return True
for size, meta in self.sizes:
if not os.path.isfile(meta['fpath']):
return True
def _intermediate_dimensions(self, img, size, meta):
if (img.width > img.height and 'crop' not in meta) \
or (img.width < img.height and 'crop' in meta):
width = size
height = int(float(size / img.width) * img.height)
else:
height = size
width = int(float(size / img.height) * img.width)
return (width, height)
def _intermediate(self, img, size, meta):
if os.path.isfile(meta['fpath']) and \
not config.options['downsize']:
return
try:
thumb = img.clone()
width, height = self._intermediate_dimensions(img, size, meta)
thumb.resize(width, height)
if 'crop' in meta:
if 'liquidcrop' in config.options and \
config.options['liquidcrop']:
thumb.liquid_rescale(size, size, 1, 1)
else:
l = t = 0
if width > size:
l = int((width - size) / 2)
if height > size:
t = int((height - size) / 2)
thumb.crop(left=l, top=t, width=size, height=size)
if img.format == "JPEG":
thumb.compression_quality = 86
thumb.unsharp_mask(
radius=0,
sigma=0.5,
amount=1,
threshold=0.03
)
thumb.format = 'pjpeg'
# this is to make sure pjpeg happens
with open(meta['fpath'], 'wb') as f:
thumb.save(file=f)
except ValueError as e:
logging.error("error while downsizing %s: %s", self.fpath, e)
return
def srcset(self, generate_caption=True, uphoto=False):
if not self.is_downsizeable:
return False
uphotoclass=''
if uphoto:
uphotoclass=' u-photo'
cl = ''
if self.imgcl:
cl = self.imgcl
caption = ''
if self.alttext \
and similar_text(self.alttext, self.fname) < 90 \
and similar_text(self.alttext, self.fname + '.' + self.ext) < 90 \
and generate_caption:
caption = '<figcaption class=\"caption\">%s</figcaption>' % (self.alttext)
if self.linkto:
target = self.linkto
# don't put linebreaks in this: Pandoc tends to evaluate them
return '<figure class="photo"><a target="_blank" class="adaptive%s" href="%s"><img src="%s" class="adaptimg %s" alt="%s" /></a>%s</figure>' % (
uphotoclass,
self.targeturl,
self.fallback,
self.imgcl,
self.alttext,
caption
)

0
nasg/img_test.py Normal file
View file

29
nasg/jinjaenv.py Normal file
View file

@ -0,0 +1,29 @@
import arrow
import jinja2
from slugify import slugify
import nasg.config as config
JINJA2ENV = jinja2.Environment(
loader=jinja2.FileSystemLoader(
searchpath=config.TEMPLATES
)
)
def jinja_filter_date(d, form='%Y-%m-%d %H:%m:%S'):
if d == 'now':
return arrow.now().datetime.strftime(form)
if form == 'c':
form = '%Y-%m-%dT%H:%M:%S%z'
return d.strftime(form)
def jinja_filter_slugify(s):
return slugify(s, only_ascii=True, lower=True)
def jinja_filter_search(s, r):
if r in s:
return True
return False
JINJA2ENV.filters['date'] = jinja_filter_date
JINJA2ENV.filters['search'] = jinja_filter_search
JINJA2ENV.filters['slugify'] = jinja_filter_slugify

76
nasg/searchindex.py Normal file
View file

@ -0,0 +1,76 @@
from whoosh import fields
from whoosh import analysis
from whoosh import index
import tempfile
import atexit
import shutil
import nasg.config as config
class SearchIndex(object):
schema = fields.Schema(
url=fields.ID(
stored=True,
),
title=fields.TEXT(
stored=True,
analyzer=analysis.FancyAnalyzer(
)
),
date=fields.DATETIME(
stored=True,
sortable=True
),
content=fields.TEXT(
stored=True,
analyzer=analysis.FancyAnalyzer(
)
),
tags=fields.TEXT(
stored=True,
analyzer=analysis.KeywordAnalyzer(
lowercase=True,
commas=True
)
),
weight=fields.NUMERIC(
sortable=True
),
img=fields.TEXT(
stored=True
)
)
def __init__(self):
self.tmp = tempfile.mkdtemp('whooshdb_', dir=tempfile.gettempdir())
self.ix = index.create_in(self.tmp, self.schema)
atexit.register(self.cleanup)
def add(self, vars):
ix = self.ix.writer()
ix.add_document(
title=vars['title'],
url=vars['url'],
content=vars['content'],
date=vars['published'],
tags=vars['tags'],
weight=1,
img=vars['img']
)
ix.commit()
def cleanup(self):
if not os.path.exists(self.tmp):
return
logging.warning("cleaning up tmp whoosh")
shutil.rmtree(self.tmp)
def save(self):
logging.info("deleting old searchdb")
shutil.rmtree(config.SEARCHDB)
logging.info("moving new searchdb")
shutil.move(self.tmp, config.SEARCHDB)

580
nasg/singular.py Normal file
View file

@ -0,0 +1,580 @@
import os
import re
import logging
import arrow
import frontmatter
import langdetect
from slugify import slugify
import nasg.config as config
import nasg.func as func
import nasg.cmdline as cmdline
from nasg.img import ImageHandler
import nasg.jinjaenv as jinjaenv
class SingularHandler(object):
def __init__(self, fpath):
logging.info("setting up singular from %s", fpath)
self.fpath= os.path.abspath(fpath)
self.fname, self.ext = os.path.splitext(os.path.basename(self.fpath))
self.target = os.path.join(
config.TARGET, "%s" % (self.fname), "index.html"
)
slug = slugify(self.fname, only_ascii=True, lower=True)
self.modtime = int(os.path.getmtime(self.fpath))
self.category = os.path.dirname(self.fpath).replace(config.CONTENT, '').strip('/')
self.vars = {
'category': self.category,
'tags': [],
'published': arrow.get(self.modtime),
'updated': arrow.get(0),
'author': config.author,
'title': '',
'raw_summary': '',
'raw_content': '',
'content': '',
'summary': '',
'reactions': {},
'exif': {},
'lang': config.site['lang'],
#'syndicate': [],
'slug': slug,
'shortslug': slug,
'srcset': '',
'url': "%s/%s/" % (config.site['url'], slug),
}
self.redirects = {}
self.pings = {}
self.template = 'singular.html'
self.img = None
self.rendered = ''
def __repr__(self):
return "Post '%s' (%s @ %s)" % (
self.vars['title'],
self.fname,
self.fpath
)
def _modtime(self):
""" Set file mtime in case it doesn't match the in-file publish or updated time """
use = 'published'
if self.vars['updated'].timestamp > self.vars['published'].timestamp:
use = 'updated'
self.modtime = int(self.vars[use].timestamp)
stattime = int(os.path.getmtime(self.fpath))
if stattime != self.modtime:
os.utime(self.fpath, (self.modtime, self.modtime))
def _detect_lang(self):
# try to detect language, ignore failures
try:
self.vars['lang'] = langdetect.detect(
"%s %s" % (
self.vars['title'],
self.vars['raw_content']
)
)
except:
pass
def _redirects(self):
if self.category in config.categories and \
'nocollection' in config.categories[self.category] and \
config.categories[self.category]['nocollection']:
return
self.redirects[self.vars['shortslug']] = 1
def _shortslug(self):
shortslug = func.baseN(self.vars['published'].timestamp)
self.vars['shortslug'] = shortslug
def _prerender(self):
for s in ['content', 'summary']:
self.vars[s] = cmdline.Pandoc(self.vars[s]).md2html().get()
def _postsetup(self):
for s in ['content', 'summary']:
if not self.vars[s]:
self.vars[s] = self.vars['raw_%s' % s]
self._modtime()
self._shortslug()
self._detect_lang()
self._redirects()
self._pings()
def _render(self):
self._prerender()
tmpl = jinjaenv.JINJA2ENV.get_template(self.template)
logging.info("rendering %s", self.fname)
tmplvars = {
'post': self.vars,
'site': config.site,
'taxonomy': {},
}
self.rendered = tmpl.render(tmplvars)
def _exists(self):
""" check if target exists and up to date """
if config.options['regenerate']:
logging.debug('REGENERATE active')
return False
if not os.path.isfile(self.target):
logging.debug('%s missing', self.target)
return False
ttime = os.stat(self.target)
if self.modtime == ttime.st_mtime:
logging.debug('%s exist and up to date', self.target)
return True
return False
def write(self):
""" Write HTML file """
if self._exists():
logging.info("skipping existing %s", self.target)
return
self._render()
d = os.path.dirname(self.target)
if not os.path.isdir(d):
os.mkdir(d)
with open(self.target, "wt") as html:
logging.info("writing %s", self.target)
html.write(self.rendered)
html.close()
os.utime(self.target, (self.modtime, self.modtime))
def indexvars(self):
""" Return values formatter for search index """
c = "%s %s %s %s %s" % (
self.vars['slug'],
self.vars['raw_summary'],
self.vars['raw_content'],
self.vars['reactions'],
self.vars['exif']
)
#c = "%s %s" % (c, self._localcopy_include())
imgstr = ''
if self.img:
imgstr = self.img.mksrcset(generate_caption=False)
ivars = {
'title': self.vars['title'],
'url': self.vars['url'],
'content': c,
'date': self.vars['published'].datetime,
'tags': ",".join(self.vars['tags']),
'img': imgstr
}
return ivars
def _pings(self):
""" Extract all URLs that needs pinging """
urlregex = re.compile(
r'\s+https?\:\/\/?[a-zA-Z0-9\.\/\?\:@\-_=#]+'
r'\.[a-zA-Z0-9\.\/\?\:@\-_=#]*'
)
urls = re.findall(urlregex, self.vars['raw_content'])
for r in self.vars['reactions'].items():
reactiontype, reactions = r
if isinstance(reactions, str):
urls.append(reactions)
elif isinstance(reactions, list):
urls = [*reactions, *urls]
#for s in self.syndicate.keys():
#matches.append('https://brid.gy/publish/%s' % (s))
urlredux = {}
for url in urls:
# exclude local matches
if config.site['domain'] in url:
continue
urlredux[url] = 1
self.pings = urlredux
def _c_adaptify_altfpath(self, fname):
for c, cmeta in config.categories.items():
tpath = os.path.join(config.CONTENT, c, fname)
if os.path.isfile(tpath):
return tpath
return None
def _c_adaptify(self):
""" Generate srcset for all suitable images """
linkto = False
isrepost = None
if len(self.vars['reactions'].keys()):
isrepost = list(self.vars['reactions'].keys())[0]
if isrepost and \
len(self.vars['reactions'][isrepost]) == 1:
linkto = self.vars['reactions'][isrepost][0]
p = re.compile(
r'(!\[(.*)\]\((?:\/(?:files|cache)'
r'(?:\/[0-9]{4}\/[0-9]{2})?\/(.*\.(?:jpe?g|png|gif)))'
r'(?:\s+[\'\"]?(.*?)[\'\"]?)?\)(?:\{(.*?)\})?)'
, re.IGNORECASE)
m = p.findall(self.vars['content'])
if not m:
return
for shortcode, alt, fname, title, cl in m:
fpath = os.path.join(config.SFILES, fname)
if not os.path.isfile(fpath):
fpath = self._c_adaptify_altfpath(fname)
if not fpath:
logging.error("missing image in %s: %s", self.fpath, fname)
continue
im = ImageHandler(
fpath,
alttext=alt,
title=title,
imgcl=cl,
linkto=linkto
)
im.downsize()
srcset = im.srcset()
if srcset:
self.vars['content'] = self.vars['content'].replace(
shortcode, srcset
)
del(im)
def _c_video(self):
""" [video] shortcode extractor """
p = re.compile(
r'(\[video mp4=\"(?:/(?:files|cache)\/(?P<vname>.*?))\"\]'
r'(?:\[/video\])?)'
)
videos = p.findall(self.vars['content'])
if not videos:
return
for shortcode, vidf in videos:
video = '<video controls><source src="%s/%s" type="video/mp4">Your browser does not support the video tag :(</video>' % (
config.site['url'],
vidf
)
self.vars['content'] = self.vars['content'].replace(shortcode, video)
def _c_snippets(self):
""" Replaces [git:(repo)/(file.ext)] with corresponding code snippet """
p = re.compile(r'(\[git:([^\/]+)\/([^\]]+\.([^\]]+))\])')
snippets = p.findall(self.vars['content'])
if not snippets:
return
for shortcode, d, f, ext in snippets:
fpath = os.path.join(config.SOURCE, d, f)
if not os.path.isfile(fpath):
logging.error("missing blogsnippet: %s", self.fpath)
continue
if re.compile(r'conf', re.IGNORECASE).match(ext):
lang = 'apache'
else:
lang = ext
with open(fpath, "rt") as snip:
c = snip.read()
snip.close
c = "\n\n```%s\n%s\n```\n" % (lang, c)
logging.debug("replacing blogsnippet %s", self.fpath)
self.vars['content'] = self.vars['content'].replace(
shortcode, c
)
#def _c_files(self):
#""" Copy misc files referenced """
#match = re.compile(
#r'\s(?:%s)?/(?:files|cache)'
#r'/.*\.(?:(?!jpe?g|png|gif).*)\s' % (glob.conf['site']['domain'])
#)
#split = re.compile(
#r'\s(?:%s)?/((?:files|cache)'
#r'/(.*\.(?:(?!jpe?g|png|gif).*)))\s' % (glob.conf['site']['domain'])
#)
##files = re.findall(match, self.content)
##print(files)
class ArticleHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(ArticleHandler, self).__init__(*args, **kwargs)
self._setup()
def _setup(self):
post = frontmatter.load(self.fpath)
self.vars['raw_content'] = "%s" % post.content
self.vars['content'] = "%s" % post.content
if 'tags' in post.metadata:
self.vars['tags'] = post.metadata['tags']
if 'title' in post.metadata:
self.vars['title'] = post.metadata['title']
if 'published' in post.metadata:
self.vars['published'] = arrow.get(post.metadata['published'])
if 'updated' in post.metadata:
self.vars['updated'] = arrow.get(post.metadata['updated'])
if 'summary' in post.metadata:
self.vars['raw_summary'] = post.metadata['summary']
self.vars['summary'] = "%s" % post.metadata['summary']
if 'redirect' in post.metadata and \
isinstance(post.metadata['redirect'], list):
for r in post.metadata['redirect']:
self.redirects[r.strip().strip('/')] = 1
#if 'syndicate' in post.metadata:
#z = post.metadata['syndicate']
#if isinstance(z, str):
#self.syndicate[z] = ''
#elif isinstance(z, dict):
#for s, c in z.items():
#self.syndicate[s] = c
#elif isinstance(z, list):
#for s in z:
#self.syndicate[s] = ''
self.vars['reactions'] = {}
# getting rid of '-' to avoid css trouble and similar
rmap = {
'bookmark-of': 'bookmark',
'repost-of': 'repost',
'in-reply-to': 'reply',
}
for x in rmap.items():
key, replace = x
if key in post.metadata:
if isinstance(post.metadata[key], str):
self.vars['reactions'][replace] = [post.metadata[key]]
elif isinstance(post.metadata[key], list):
self.vars['reactions'][replace] = post.metadata[key]
self._c_adaptify()
self._c_snippets()
self._c_video()
#self._files()
super(ArticleHandler, self)._postsetup()
class PhotoHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(PhotoHandler, self).__init__(*args, **kwargs)
self.img = ImageHandler(self.fpath)
self._setup()
def _setvars(self):
mapping = {
'camera': [
'EXIF:Model'
],
'aperture': [
'EXIF:FNumber',
'Composite:Aperture'
],
'shutter_speed': [
'EXIF:ExposureTime'
],
'focallength': [
'EXIF:FocalLength',
'Composite:FocalLength35efl',
],
'iso': [
'EXIF:ISO'
],
'lens': [
'Composite:LensID',
'MakerNotes:Lens',
'Composite:LensSpec'
]
}
for ekey, candidates in mapping.items():
for candidate in candidates:
val = self.img.exif.get(candidate, None)
if val:
self.vars['exif'][ekey] = val
break
gps = ['Latitude', 'Longitude']
for g in gps:
gk = 'EXIF:GPS%s' % (g)
if gk not in self.img.exif:
continue
r = 'EXIF:GPS%sRef' % (g)
ref = None
if r in self.img.exif:
ref = self.img.exif[r]
self.vars['exif']['geo_%s' % (g.lower())] = func.gps2dec(
self.img.exif[gk],
ref
)
def _setfromexif_str(self, varkey, exifkeys):
for key in exifkeys:
val = self.img.exif.get(key, None)
if not val:
continue
self.vars[varkey] = val.strip()
return
def _setfromexif_lst(self, varkey, exifkeys):
collected = {}
for key in exifkeys:
val = self.img.exif.get(key, None)
if not val:
continue
if isinstance(val, str):
self.img.exif[key] = val.split(",")
# not elif: the previous one converts all string to list
# we rely on that
if isinstance(val, list):
for v in val:
collected[slugify(str(v).strip())] = str(v).strip()
self.vars[varkey] = collected.values()
return
def _setfromexif_date(self, varkey, exifkeys):
pattern = re.compile(
"(?P<Y>[0-9]{4}):(?P<M>[0-9]{2}):(?P<D>[0-9]{2})\s+"
"(?P<T>[0-9]{2}:[0-9]{2}:[0-9]{2})Z?"
)
for key in exifkeys:
if key not in self.img.exif:
continue
if not self.img.exif[key]:
continue
date = None
v = pattern.match(self.img.exif[key]).groupdict()
if not v:
continue
try:
date = arrow.get('%s-%s-%s %s' % (v['Y'], v['M'], v['D'], v['T']))
except:
continue
if not date:
continue
self.vars['published'] = date
logging.debug("'published' set to %s from key %s", self.vars['published'], key)
return
def _setup(self):
self._setfromexif_str('title', [
'XMP:Title',
'XMP:Headline',
'IPTC:Headline'
])
self._setfromexif_str('raw_content', [
'XMP:Description',
'IPTC:Caption-Abstract'
])
self._setfromexif_lst('tags', [
'XMP:Keywords',
'IPTC:Keywords'
])
self._setfromexif_date('published', [
'XMP:DateTimeDigitized',
'XMP:CreateDate',
'EXIF:CreateDate',
'EXIF:ModifyDate'
])
self._setvars()
self.img.title = self.vars['title']
self.img.alttext = self.vars['title']
self.vars['content'] = "%s\n\n%s" % (
self.vars['raw_content'],
self.img.srcset(generate_caption=False, uphoto=True)
)
self.img.downsize()
self.vars['img'] = self.img.featured()
super(PhotoHandler, self)._postsetup()
class PageHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(PageHandler, self).__init__(*args, **kwargs)
self.template = 'page.html'
self._setup()
def _setup(self):
with open(self.fpath) as c:
self.vars['raw_content'] = c.read()
c.close()
self._c_adaptify()
super(PageHandler, self)._postsetup()

319
nasg/taxonomy.py Normal file
View file

@ -0,0 +1,319 @@
import math
import logging
import os
import collections
from slugify import slugify
import nasg.config as config
import nasg.jinjaenv as jinjaenv
import arrow
class TaxonomyHandler(object):
def __init__(self, name, taxonomy='category', slug='', description='', render=True):
logging.info("setting up taxonomy: %s", name)
self.name = name
self.taxonomy = taxonomy
self.description = description
self.render = render
if slug:
self.slug = slug
else:
self.slug = slugify(self.name, only_ascii=True, lower=True)
self.posts = collections.OrderedDict()
#self.basedir = os.path.join(config.TARGET, self.taxonomy, self.slug)
if len(self.taxonomy) and len(self.name):
self.basedir = os.path.join(config.TARGET, self.taxonomy, self.slug)
self.baseurl = "/%s/%s/" % (self.taxonomy, self.slug)
else:
self.baseurl = '/'
self.basedir = os.path.join(config.TARGET)
self.modtime = 0
def __getitem__(self, key):
return self.posts[key]
def __repr__(self):
return 'Taxonomy %s (name: %s, slug: %s) with %i posts' % (
self.taxonomy,
self.name,
self.slug,
len(self.posts)
)
def __next__(self):
try:
r = self.posts.next()
except:
raise StopIteration()
return r
def __iter__(self):
for ix, post in self.posts.items():
yield post
return
def append(self, post):
k = int(post.vars['published'].timestamp)
if k in self.posts:
logging.error("colliding post timestamps: %s vs %s", self.posts[k].fpath, post.fpath)
inc = 1
while k in self.posts:
k = int(k+1)
self.posts[k] = post
self.posts = collections.OrderedDict(sorted(self.posts.items(), reverse=True))
def write(self):
if not self.render:
return
l = list(self.posts.keys())
if len(l):
self.modtime = max(list(self.posts.keys()))
else:
self.modtime = arrow.utcnow().timestamp
self._write_pages()
self._write_rss()
def _page_vars(self, page, pages, start, end):
return {
'taxonomy': {
'url': self.baseurl,
'name': self.name,
'taxonomy': self.taxonomy,
'description': self.description,
'paged': page,
'total': pages,
'perpage': int(config.site['pagination']),
},
'site': config.site,
'posts': [self.posts[k].vars for k in list(sorted(
self.posts.keys(), reverse=True))[start:end]],
}
def _write_file(self, fpath, template, tvars):
tmpl = jinjaenv.JINJA2ENV.get_template(template)
logging.info("writing %s" % (fpath))
with open(fpath, "wt") as f:
r = tmpl.render(tvars)
f.write(r)
f.close()
os.utime(fpath, (self.modtime, self.modtime))
def _write_rss(self):
rssdir = os.path.join(self.basedir, 'feed')
if not os.path.isdir(rssdir):
os.makedirs(rssdir)
fpath = os.path.join(rssdir, 'index.xml')
tvars = self._page_vars(1, 1, 0, int(config.site['rsspagination']))
self._write_file(fpath, 'rss.html', tvars)
def _write_page(self, page, pages, start, end):
if 1 == page:
pagedir = self.basedir
else:
pagedir = os.path.join(self.basedir, 'page', "%i" % page)
if not os.path.isdir(pagedir):
os.makedirs(pagedir)
fpath = os.path.join(pagedir, 'index.html')
tvars = self._page_vars(page, pages, start, end)
self._write_file(fpath, 'archive.html', tvars)
def _write_pages(self):
perpage = int(config.site['pagination'])
pages = math.ceil(len(self.posts)/perpage)
page = 1
while page <= pages:
start = int((page-1) * perpage)
end = int(start+perpage)
self._write_page(page, pages, start, end)
page += 1
#def _test_freshness(self):
#t, lp = list(self.posts.items())[0]
#self.lptime = lp.ftime.st_mtime
#if os.path.isfile(self.indexpath):
#p = self.indexpath
#elif os.path.isfile(self.simplepath):
#p = self.simplepath
#else:
#return False
#itime = os.stat(p)
#if itime.st_mtime == self.lptime and not glob.FORCEWRITE:
#logging.debug(
#'Taxonomy tree is fresh for %s' % (self.name)
#)
#return True
#return False
#def _test_dirs(self):
#if not os.path.isdir(self.taxp):
#os.mkdir(self.taxp)
#if not os.path.isdir(self.basep):
#os.mkdir(self.basep)
#def write_paginated(self):
#if self._test_freshness():
#return
#self._test_dirs()
#taxp = os.path.join(glob.TARGET, self.taxonomy)
#basep = os.path.join(glob.TARGET, self.taxonomy, self.slug)
#if not os.path.isdir(taxp):
#os.mkdir(taxp)
#if not os.path.isdir(basep):
#os.mkdir(basep)
#pages = math.ceil(len(self.posts) / glob.conf['perpage'])
#page = 1
#if len(self.taxonomy) and len(self.slug):
#base_url = "/%s/%s/" % (self.taxonomy, self.slug)
#else:
#base_url = '/'
#while page <= pages:
#start = int((page-1) * int(glob.conf['perpage']))
#end = int(start + int(glob.conf['perpage']))
#dorss = False
#posttmpls = [self.posts[k].tmpl() for k in list(sorted(
#self.posts.keys(), reverse=True))[start:end]]
#if page == 1:
#tpath = self.indexpath
#do_rss = True
## RSS
#else:
#do_rss = False
#if not os.path.isdir(self.pagedp):
#os.mkdir(self.pagedp)
#tdir = os.path.join(self.pagedp, "%d" % page)
#if not os.path.isdir(tdir):
#os.mkdir(tdir)
#tpath = os.path.join(tdir, "index.html")
#tvars = {
#'taxonomy': {
#'url': base_url,
#'name': self.name,
#'taxonomy': self.taxonomy,
#'description': self.description,
#'paged': page,
#'total': pages,
#'perpage': glob.conf['perpage'],
#},
#'site': glob.conf['site'],
#'posts': posttmpls,
#}
#tmpl = glob.jinja2env.get_template('archive.html')
#logging.info("rendering %s" % (tpath))
#with open(tpath, "w") as html:
#r = tmpl.render(tvars)
#soup = BeautifulSoup(r, "html5lib")
#r = soup.prettify()
#logging.info("writing %s" % (tpath))
#html.write(r)
#html.close()
#os.utime(tpath, (self.lptime, self.lptime))
#if do_rss:
#feeddir = os.path.join(self.basep, 'feed')
#if not os.path.isdir(feeddir):
#os.mkdir(feeddir)
#feedpath = os.path.join(feeddir, "index.xml")
#tmpl = glob.jinja2env.get_template('rss.html')
#logging.info("rendering %s" % (feedpath))
#with open(feedpath, "w") as html:
#r = tmpl.render(tvars)
#logging.info("writing %s" % (feedpath))
#html.write(r)
#html.close()
#os.utime(feedpath, (self.lptime, self.lptime))
#page = page+1
#def write_simple(self, template='archive.html'):
#if self._test_freshness():
#return
#self._test_dirs()
#base_url = "/%s/" % (self.slug)
#posttmpls = [self.posts[k].tmpl() for k in list(sorted(
#self.posts.keys(), reverse=True))]
#tvars = {
#'taxonomy': {
#'url': base_url,
#'name': self.name,
#'taxonomy': self.taxonomy,
#'description': self.description,
#'paged': 0,
#'total': 0,
#'perpage': glob.conf['perpage'],
#},
#'site': glob.conf['site'],
#'posts': posttmpls,
#}
#with open(os.path.join(self.simplepath), "w") as html:
#html.write(json.dumps(tvars, indent=4, sort_keys=True, default=str))
#html.close()
##tmpl = glob.jinja2env.get_template('gallery.html')
##logging.info("rendering %s" % (indexpath))
##with open(indexpath, "w") as html:
##r = tmpl.render(tvars)
##soup = BeautifulSoup(r, "html5lib")
##r = soup.prettify()
##logging.info("writing %s" % (indexpath))
##html.write(r)
##html.close()
##os.utime(indexpath, (lptime, lptime))
#def writesitemap(self):
#sitemap = "%s/sitemap.txt" % (glob.TARGET)
#urls = []
#for p in self.posts.items():
#t, data = p
#urls.append( "%s/%s" % ( glob.conf['site']['url'], data.slug ) )
#with open(sitemap, "w") as f:
#logging.info("writing %s" % (sitemap))
#f.write("\n".join(urls))
#f.close()

26
nasg/tests/cmdline.py Normal file
View file

@ -0,0 +1,26 @@
import unittest
import nasg.cmdline as cmdline
class Test(unittest.TestCase):
def testException(self):
self.assertRaises(
ValueError,
cmdline.CommandLine,
'12345678'
)
def testOK(self):
self.assertEqual(
cmdline.CommandLine('ls ./test_cmdline.py').run().stdout,
'./test_cmdline.py'
)
def testExiftool(self):
self.assertEqual(
cmdline.Exiftool().get(),
{}
)
if __name__ == '__main__':
unittest.main()

60
nasg/tests/func.py Normal file
View file

@ -0,0 +1,60 @@
import unittest
import nasg.func as func
class Test(unittest.TestCase):
def test_baseN_zero(self):
self.assertEqual(
func.baseN(0),
'0'
)
def test_baseN(self):
self.assertEqual(
func.baseN(1489437846),
'omrtli'
)
def test_gps2dec_W(self):
self.assertEqual(
func.gps2dec(
'103 deg 52\' 32.79" W'
),
-103.875775
)
def test_gps2dec_E(self):
self.assertEqual(
func.gps2dec(
'103 deg 52\' 32.79" E'
),
103.875775
)
def test_gps2dec_N(self):
self.assertEqual(
func.gps2dec(
'33 deg 9\' 34.93" N'
),
33.159703
)
def test_gps2dec_S(self):
self.assertEqual(
func.gps2dec(
'33 deg 9\' 34.93" S'
),
-33.159703
)
def test_gps2dec(self):
self.assertEqual(
func.gps2dec(
'33 deg 9\' 34.93"'
),
33.159703
)
if __name__ == '__main__':
unittest.main()

36
nasg/tests/jinjaenv.py Normal file
View file

@ -0,0 +1,36 @@
import unittest
import nasg.jinjaenv as jinjaenv
import arrow
class CommandLineTest(unittest.TestCase):
def test_jinja_filter_date(self):
t = arrow.utcnow()
self.assertEqual(
jinjaenv.jinja_filter_date(t.datetime, 'c'),
t.format('YYYY-MM-DDTHH:mm:ssZ')
)
def test_jinja_filter_slugify(self):
self.assertEqual(
jinjaenv.jinja_filter_slugify('Árvíztűrő Tükörfúrógép'),
'arvizturo-tukorfurogep'
)
def test_jinja_filter_search1(self):
self.assertTrue(
jinjaenv.jinja_filter_search('almafa', 'alma')
)
def test_jinja_filter_search3(self):
self.assertTrue(
jinjaenv.jinja_filter_search( ['almafa' ], 'almafa')
)
def test_jinja_filter_search2(self):
self.assertFalse(
jinjaenv.jinja_filter_search('almafa', 'eszeveszett')
)
if __name__ == '__main__':
unittest.main()

10
nasg/tests/singular.py Normal file
View file

@ -0,0 +1,10 @@
import unittest
import nasg.singular as singular
class Test(unittest.TestCase):
def test(self):
self.assertEqual('','')
if __name__ == '__main__':
unittest.main()

10
nasg/tests/taxonomy.py Normal file
View file

@ -0,0 +1,10 @@
import unittest
import nasg.taxonomy as taxonomy
class Test(unittest.TestCase):
def test(self):
self.assertEqual('','')
if __name__ == '__main__':
unittest.main()

203
new.py
View file

@ -1,203 +0,0 @@
#!/home/petermolnar.net/.venv/bin/python3.5
"""Usage: new.py [-h] [-t TAGS] [-d DATE] [-s SLUG] [-l TITLE] [-b BOOKMARK] [-r REPLY] [-p REPOST] [-c CONTENT] [-u SUMMARY] [-i REDIRECT] [-a CATEGORY]
-h --help show this
-t --tags TAGS ';' separated, quoted list of tags
-d --date DATE YYYY-mm-ddTHH:MM:SS+TZTZ formatted date, if not now
-s --slug SLUG slug (normally autogenerated from title or pubdate)
-l --title TITLE title of new entry
-b --bookmark BOOKMARK URL to bookmark
-r --reply REPLY URL to reply to
-p --repost REPOST URL to repost
-c --content CONTENT content of entry
-u --summary SUMMARY summary of entry
-i --redirect REDIRECT ';' separated, quoted list of redirects
-a --category CATEGORY to put the content in this category
"""
import os
import sys
import datetime
import calendar
import logging
import json
import glob
import iso8601
import pytz
from docopt import docopt
from slugify import slugify
from ruamel import yaml
import singular
class ContentCreator(object):
def __init__(
self,
category='note',
tags=[],
date='',
slug='',
title='',
bookmark='',
reply='',
repost='',
content='',
summary='',
redirect=[]
):
self.category = category
if date:
self.date = iso8601.parse_date(date)
else:
self.date = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
self.time = calendar.timegm(self.date.timetuple())
self.title = title
if slug:
self.slug = slug
elif title:
self.slug = slugify(title, only_ascii=True, lower=True)
else:
self.slug = singular.SingularHandler.baseN(self.time)
self.tags = tags
self.bookmark = bookmark
self.reply = reply
self.repost = repost
if content:
self.content = content
else:
self.content = ''
self.summary = summary
self.redirect = redirect
self._makeyaml()
self._write()
def _makeyaml(self):
self.yaml = {
'published': self.date.strftime("%Y-%m-%dT%H:%M:%S%z")
}
if self.title:
self.yaml['title'] = self.title
if self.tags:
self.yaml['tags'] = self.tags
if self.bookmark:
self.yaml['bookmark-of'] = self.bookmark
if self.repost:
self.yaml['repost-of'] = self.repost
if self.reply:
self.yaml['in-reply-to'] = self.reply
if self.summary:
self.yaml['summary'] = self.summary
if self.redirect:
self.yaml['redirect'] = self.redirect
def _write(self):
fdir = os.path.join(glob.CONTENT, self.category)
if not os.path.isdir(fdir):
sys.exit("there is no category %s" % (self.category))
self.fpath = os.path.join(glob.CONTENT, self.category, "%s.md" % (self.slug))
self.out = "---\n" + yaml.dump(self.yaml, Dumper=yaml.RoundTripDumper) + "---\n\n" + self.content
with open(self.fpath, "w") as archive:
logging.info("writing %s", self.fpath)
logging.info("contents: %s", self.out)
archive.write(self.out)
archive.close()
class ParseCMDLine(object):
def __init__(self, arguments):
for x in ['--redirect', '--tags']:
if x in arguments and arguments[x]:
arguments[x] = arguments[x].split(";")
self.entry = ContentCreator(
category=arguments['--category'],
tags=arguments['--tags'],
date=arguments['--date'],
slug=arguments['--slug'],
title=arguments['--title'],
bookmark=arguments['--bookmark'],
reply=arguments['--reply'],
repost=arguments['--repost'],
content=arguments['--content'],
summary=arguments['--summary'],
redirect=arguments['--redirect']
)
if __name__ == '__main__':
args = docopt(__doc__, version='new.py 0.1')
with open(os.path.join(glob.CACHE, "slugs.json")) as sf:
slugs = json.loads(sf.read())
sf.close()
if not args['--category']:
c = 'note'
args['--category'] = input('Category [%s]: ' % (c)) or c
if not args['--date']:
d = datetime.datetime.utcnow().replace(tzinfo=pytz.utc).strftime("%Y-%m-%dT%H:%M:%S%z")
args['--date'] = input('Date [%s]' % (d)) or d
if not args['--title']:
args['--title'] = input('Title []:') or ''
if not args['--tags']:
args['--tags'] = input('Tags (separated by ;, no whitespace) []:') or []
if not args['--bookmark']:
args['--bookmark'] = input('Bookmark of URL []:') or ''
if not args['--reply']:
args['--reply'] = input('Reply to URL []:') or ''
if not args['--repost']:
args['--repost'] = input('Repost of URL []:') or ''
if not args['--slug']:
if args['--title']:
slug = slugify(args['--title'], only_ascii=True, lower=True)
elif args['--bookmark']:
slug = slugify("re: %s" % (args['--bookmark']), only_ascii=True, lower=True)
elif args['--reply']:
slug = slugify("re: %s" % (args['--reply']), only_ascii=True, lower=True)
elif args['--repost']:
slug = slugify("re: %s" % (args['--repost']), only_ascii=True, lower=True)
else:
d = iso8601.parse_date(args['--date'])
t = calendar.timegm(d.timetuple())
slug = singular.SingularHandler.baseN(t)
args['--slug'] = input('Slug [%s]:' % (slug)) or slug
if args['--slug'] in slugs:
logging.warning("This slug already exists: %s", args['--slug'])
slugbase = args['--slug']
inc = 1
while args['--slug'] in slugs:
args['--slug'] = "%s-%d" % (slugbase, inc)
inc = inc+1
logging.warning("Using %s as slug", args['--slug'])
if not args['--summary']:
args['--summary'] = input('Summary []:') or ''
if not args['--content']:
args['--content'] = input('Content []:') or ''
if not args['--redirect']:
args['--reditect'] = input('Additional slugs (separated by ;, no whitespace) []:') or []
p = ParseCMDLine(args)

View file

@ -1,850 +0,0 @@
import glob
import asyncio
import uvloop
import os
from sanic import Sanic
import sanic.response
from sanic.log import log as logging
from whoosh import index, qparser
import pynmea2
import datetime
import pytz
import re
import validators
import requests
import pypandoc
import hashlib
import time
from webmentiontools import urlinfo
import json
import calendar
import mimetypes
import singular
import urllib.parse
from ruamel import yaml
from slugify import slugify
import smtplib
import iso8601
import csv
import shutil
import collections
from git import Repo, Actor
import frontmatter
#import gzip
import arrow
class ToEmail(object):
def __init__(self, webmention):
self.webmention = webmention
self.set_html()
self.set_headers()
def set_html(self):
for authormeta in ['email', 'name', 'url']:
if not authormeta in self.webmention['author']:
self.webmention['author'][authormeta] = ''
html = """
<html>
<head></head>
<body>
<h1>
New %s
</h1>
<dl>
<dt>From</dt>
<dd>
<a href="%s">%s</a><br />
<a href="mailto:%s">%s</a>
</dd>
<dt>Source</dt>
<dd><a href="%s">%s</a></dd>
<dt>Target</dt>
<dd><a href="%s">%s</a></dd>
</dl>
%s
</body>
</html>""" % (
self.webmention['type'],
self.webmention['author']['url'],
self.webmention['author']['name'],
self.webmention['author']['email'],
self.webmention['author']['email'],
self.webmention['source'],
self.webmention['source'],
self.webmention['target'],
self.webmention['target'],
pypandoc.convert_text(
self.webmention['content'],
to='html5',
format="markdown+" + "+".join([
'backtick_code_blocks',
'auto_identifiers',
'fenced_code_attributes',
'definition_lists',
'grid_tables',
'pipe_tables',
'strikeout',
'superscript',
'subscript',
'markdown_in_html_blocks',
'shortcut_reference_links',
'autolink_bare_uris',
'raw_html',
'link_attributes',
'header_attributes',
'footnotes',
])
)
)
self.html = html
def set_headers(self):
""" Create and send email from a parsed webmention """
self.headers = {
'Content-Type': 'text/html; charset=utf-8',
'Content-Disposition': 'inline',
'Content-Transfer-Encoding': '8bit',
'Date': self.webmention['date'].strftime('%a, %d %b %Y %H:%M:%S %Z'),
'X-WEBMENTION-SOURCE': self.webmention['source'],
'X-WEBMENTION-TARGET': self.webmention['target'],
'From': glob.conf['from']['address'],
'To': glob.conf['to']['address'],
'Subject': "[webmention] from %s to %s" % ( self.webmention['source'], self.webmention['target'] ),
}
def send(self):
msg = ''
for key, value in self.headers.items():
msg += "%s: %s\n" % ( key, value )
msg += "\n%s\n" % self.html
try:
s = smtplib.SMTP( glob.conf['smtp']['host'], glob.conf['smtp']['port'] )
if glob.conf['smtp']['tls']:
s.ehlo()
s.starttls()
s.ehlo()
if glob.conf['smtp']['username'] and glob.conf['smtp']['password']:
s.login(glob.conf['smtp']['username'], glob.conf['smtp']['password'])
s.sendmail( self.headers['From'], [ self.headers['To'] ], msg.encode("utf8") )
s.quit()
except:
print("Unexpected error:", sys.exc_info()[0])
raise
class MicropubHandler(object):
def __init__(self, request):
self.request = request
self.response = sanic.response.text("Unhandled error", status=500)
self.slug = ''
self.content = ''
self.category = 'note'
self.meta = {}
self.dt = datetime.datetime.now().replace(tzinfo=pytz.utc)
logging.debug("incoming micropub request:")
logging.debug(self.request.body)
logging.debug("** args:")
logging.debug(self.request.args)
logging.debug("** query string:")
logging.debug(self.request.query_string)
logging.debug("** headers:")
logging.debug(self.request.headers)
with open(os.path.join(glob.CACHE, "tags.json"), "r") as db:
self.existing_tags = json.loads(db.read())
db.close()
self._parse()
def _verify(self):
if 'q' in self.request.args:
if 'config' in self.request.args['q']:
self.response = sanic.response.json({
'tags': self.existing_tags
}, status=200)
return
if 'syndicate-to' in self.request.args['q']:
self.response = sanic.response.json({
'syndicate-to': []
}, status=200)
return
if not 'access_token' in self.request.form:
self.response = sanic.response.text("Mising access token", status=401)
return
token = self.request.form.get('access_token')
verify = requests.get(
'https://tokens.indieauth.com/token',
allow_redirects=False,
timeout=10,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Bearer %s' % (token)
});
if verify.status_code != requests.codes.ok:
self.response = sanic.response.text("Could not verify access token", status=500)
return False
response = urllib.parse.parse_qs(verify.text)
logging.debug(response)
if 'scope' not in response or 'me' not in response:
self.response = sanic.response.text("Could not verify access token", status=401)
return False
if '%s/' % (glob.conf['site']['url'].rstrip()) not in response['me']:
self.response = sanic.response.text("You can't post to this domain.", status=401)
return False
if 'post' not in response['scope'] and 'create' not in response['scope']:
self.response = sanic.response.text("Invalid scope", status=401)
return False
return True
def _parse(self):
if not self._verify():
return
if len(self.request.files):
self.response = sanic.response.text("File handling is not yet done", status=501)
return
#for ffield in self.request.files.keys():
#logging.info("got file field: %s" % ffield)
#f = self.request.files.get(ffield)
#logging.info("mime is: %s" % f.type)
#logging.info("ext should be: %s" % mimetypes.guess_extension(f.type))
##f.body
##f.type
##logging.info( f )
self.meta['published'] = self.dt.strftime('%Y-%m-%dT%H:%M:%S%z')
slug = None
if 'content' in self.request.form and len(self.request.form.get('content')):
self.content = self.request.form.get('content')
if 'summary' in self.request.form and len(self.request.form.get('summary')):
self.meta['summary'] = self.request.form.get('summary')
if 'slug' in self.request.form and len(self.request.form.get('slug')):
slug = self.request.form.get('slug')
if 'name' in self.request.form and len(self.request.form.get('name')):
self.meta['title'] = self.request.form.get('name')
if not slug:
slug = self.meta['title']
if 'in-reply-to' in self.request.form and len(self.request.form.get('in-reply-to')):
self.meta['in-reply-to'] = self.request.form.get('in-reply-to')
if not slug:
slug = 're: %s', self.meta['in-reply-to']
if 'repost-of' in self.request.form and len(self.request.form.get('repost-of')):
self.meta['repost-of'] = self.request.form.get('repost-of')
category = 'bookmark'
if not slug:
slug = '%s', self.meta['repost-of']
if 'bookmark-of' in self.request.form and len(self.request.form.get('bookmark-of')):
self.meta['bookmark-of'] = self.request.form.get('bookmark-of')
self.category = 'bookmark'
if not slug:
slug = '%s', self.meta['bookmark-of']
if 'category[]' in self.request.form:
self.meta['tags'] = list(self.request.form['category[]'])
if 'summary' in self.meta and ('IT' in self.meta['tags'] or 'it' in self.meta['tags']):
self.category = 'article'
elif 'summary' in self.meta and ('journal' in self.meta['tags'] or 'journal' in self.meta['tags']):
self.category = 'journal'
if not slug:
slug = singular.SingularHandler.baseN(calendar.timegm(self.dt.timetuple()))
self.slug = slugify(slug, only_ascii=True, lower=True)
self._write()
def _write(self):
fpath = os.path.join(glob.CONTENT, self.category, '%s.md' % (self.slug))
if os.path.isfile(fpath):
self.response = sanic.response.text("Update handling is not yet done", status=501)
return
logfile = os.path.join(glob.LOGDIR, "micropub-%s.log" % (self.dt.strftime("%Y-%m")))
with open (logfile, 'a') as micropublog:
logging.debug("logging micropub request")
micropublog.write("%s %s\n" % (self.dt.strftime('%Y-%m-%dT%H:%M:%S%z'), fpath))
micropublog.close()
with open (fpath, 'w') as mpf:
logging.info("writing file to: %s", fpath)
out = "---\n" + yaml.dump(self.meta, Dumper=yaml.RoundTripDumper, allow_unicode=True, indent=4) + "---\n\n" + self.content
mpf.write(out)
mpf.close()
self._git(fpath)
logging.info("trying to open and parse the received post")
post = singular.ArticleHandler(fpath, category=self.category)
post.write()
post.pings()
self.response = sanic.response.text(
"Post created",
status = 201,
headers = {
'Location': "%s/%s/" % (glob.conf['site']['url'], self.slug)
}
)
return
def _git(self, fpath):
logging.info("committing to git")
repo = Repo(glob.CONTENT)
author = Actor(glob.conf['author']['name'], glob.conf['author']['email'])
index = repo.index
newfile = fpath.replace(glob.CONTENT, '').lstrip('/')
index.add([newfile])
message = 'new content via micropub: %s' % (newfile)
index.commit(message, author=author, committer=author)
class SearchHandler(object):
def __init__ (self, query):
self.query = query
self.response = sanic.response.text("You seem to have forgot to enter what you want to search for. Please try again.", status=400)
if not query:
return
self._tmpl = glob.jinja2env.get_template('searchresults.html')
self._ix = index.open_dir(glob.SEARCHDB)
self._parse()
def _parse(self):
self.query = self.query.replace('+', ' AND ')
self.query = self.query.replace(' -', ' NOT ')
qp = qparser.MultifieldParser(
["title", "content", "tags"],
schema = glob.schema
)
q = qp.parse(self.query)
r = self._ix.searcher().search(q, sortedby="weight", limit=100)
logging.info("results for '%s': %i", self.query, len(r))
results = []
for result in r:
res = {
'title': result['title'],
'url': result['url'],
'highlight': result.highlights("content"),
}
if 'img' in result:
res['img'] = result['img']
results.append(res)
tvars = {
'term': self.query,
'site': glob.conf['site'],
'posts': results,
'taxonomy': {}
}
logging.info("collected %i results to render", len(results))
html = self._tmpl.render(tvars)
self.response = sanic.response.html(html, status=200)
class WebmentionHandler(object):
def __init__ ( self, source, target ):
self.source = source
self.target = target
self.time = arrow.utcnow().timestamp
logging.debug("validating: from: %s; to: %s" % (self.source, self.target) )
self.response = sanic.response.json({
'status': 'ok','msg': 'accepted',
}, 200)
self._validate()
self._parse()
self._archive()
self._send()
def _validate(self):
if not validators.url(self.source):
self.response = sanic.response.json({
'status': 'error','msg': '"souce" parameter is an invalid URL',
}, 400)
return
if not validators.url(self.target):
self.response = sanic.response.json({
'status': 'error','msg': '"target" parameter is an invalid URL',
}, 400)
return
_target = urllib.parse.urlparse(self.target)
_target_domain = '{uri.netloc}'.format(uri=_target)
if not _target_domain in glob.conf['accept_domains']:
self.response = sanic.response.json({
'status': 'error',
'msg': "%s' is not in the list of allowed domains" % (
_target_domain
)
}, 400)
return
_source = urllib.parse.urlparse(self.source)
_source_domain = '{uri.netloc}'.format(uri=_source)
if _source_domain == _target_domain and not glob.conf['allow_selfmention']:
self.response = sanic.response.json({
'status': 'error',
'msg': "selfpings are disabled"
}, 400)
return
return
def _parse(self):
if self.response.status != 200:
return
self._log()
self._source = urlinfo.UrlInfo(self.source)
if self._source.error:
logging.warning( "couldn't fetch %s; dropping webmention" % (self.source))
return
self.source = self._source.realurl
if not self._source.linksTo(self.target):
logging.warning( "%s is not linking to %s; dropping webmention" % (self.source, self.target))
return
self._target = urlinfo.UrlInfo(self.target)
if self._target.error:
logging.warning( "couldn't fetch %s; dropping webmention" % (self.target))
return
self.target = self._target.realurl
self.webmention = {
'author': self._source.author(),
'type': self._source.relationType(),
'target': self.target,
'source': self.source,
'date': arrow.get(self._source.pubDate()),
'content': pypandoc.convert_text(
self._source.content(),
to="markdown-" + "-".join([
'raw_html',
'native_divs',
'native_spans',
]),
format='html'
)
}
def _send(self):
if self.response.status != 200:
return
m = ToEmail(self.webmention)
m.send()
def _archive(self):
if self.response.status != 200:
return
fbase = self.webmention['date'].format('YYYY-MM-DD-HH-mm-ss')
fpath = self._archive_name(fbase)
archive = dict(self.webmention)
archive['date'] = archive['date'].format('YYYY-MM-DDTHH.mm.ssZ')
content = archive['content']
del(archive['content'])
with open (fpath, 'w') as f:
logging.info("writing file to: %s", fpath)
out = "---\n" + yaml.dump(
archive,
Dumper=yaml.RoundTripDumper,
allow_unicode=True,
indent=4
) + "---\n\n" + content
f.write(out)
f.close()
def _verify_archive(self, p):
archive = frontmatter.load(p)
if 'target' not in archive.metadata:
logging.warning('missing target')
return False
if 'source' not in archive.metadata:
logging.warning('missing source')
return False
if 'date' not in archive.metadata:
logging.warning('missing date')
return False
if archive.metadata['target'] != self.webmention['target']:
logging.warning('target different')
return False
if archive.metadata['source'] != self.webmention['source']:
logging.warning('source different')
return False
d = arrow.get(archive.metadata['date'])
if d.timestamp != self.webmention['date'].timestamp:
logging.warning('date different')
return False
# overwrite
return True
def _archive_name(self, archive, ext='.md'):
p = os.path.join(glob.COMMENTS, "%s%s" % (archive, ext))
if not os.path.exists(p):
logging.debug("%s doesn't exits yet" % p)
return p
logging.debug("%s exists, checking for update" % p)
if self._verify_archive(p):
return p
# another comment with the exact same second? wy not.
names = [x for x in os.listdir(glob.COMMENTS) if x.startswith(archive)]
suffixes = [x.replace(archive, '').replace(ext, '').replace('.','') for x in names]
indexes = [int(x) for x in suffixes if x and set(x) <= set('0123456789')]
idx = 1
if indexes:
idx += sorted(indexes)[-1]
return os.path.join(glob.COMMENTS, "%s.%d%s" % (archive, idx, ext))
def _log(self):
if not os.path.isdir(glob.LOGDIR):
os.mkdir (glob.LOGDIR)
logfile = os.path.join(glob.LOGDIR, datetime.datetime.now().strftime("%Y-%m"))
s = json.dumps({
'time': self.time,
'source': self.source,
'target': self.target
})
with open(logfile, "a") as log:
logging.debug( "writing logfile %s with %s" % (logfile, s))
log.write("%s\n" % (s))
log.close()
class TimeSeriesHandler(object):
def __init__(self, tag):
if not os.path.isdir(glob.TSDBDIR):
os.mkdir(glob.TSDBDIR)
self.tag = tag
self.p = os.path.join(glob.TSDBDIR, '%s.csv' % (self.tag))
self.db = {}
#def _loaddb(self):
#if not os.path.isfile(self.p):
#return
#pattern = re.compile(r'^([0-9-\+:T]+)\s+(.*)$')
#searchfile = open(self.p, 'r')
#for line in searchfile:
#matched = re.match(pattern, line)
#if not matched:
#continue
#epoch = int(iso8601.parse_date(matched.group(1)).replace(tzinfo=pytz.utc).strftime('%s'))
#data = matched.group(2)
#self.db[epoch] = data
#searchfile.close()
#def _dumpdb(self):
#lines = []
#for e in self.db.items():
#epoch, data = e
#tstamp = datetime.datetime.utcfromtimestamp(epoch).replace(tzinfo=pytz.utc).strftime(glob.ISODATE)
#line = '%s %s' % (tstamp, data)
#lines.append(line)
#bkp = '%s.bkp' % (self.p)
#shutil.copy(self.p, bkp)
#with open(self.p, "w") as searchfile:
#searchfile.write()
#del(cr)
#csvfile.close()
#os.unlink(bkp)
@staticmethod
def _common_date_base(d1, d2):
d1 = d1.replace(tzinfo=pytz.utc).strftime(glob.ISODATE)
d2 = d2.replace(tzinfo=pytz.utc).strftime(glob.ISODATE)
l = len(d1)
common = ''
for i in range(l):
if d1[i] == d2[i]:
common = common + d1[i]
else:
break
return common
def search(self, when, tolerance=1800):
when = when.replace(tzinfo=pytz.utc)
tolerance = int(tolerance/2)
minwhen = when - datetime.timedelta(seconds=tolerance)
maxwhen = when + datetime.timedelta(seconds=tolerance)
closest = None
mindiff = float('inf')
common = TimeSeriesHandler._common_date_base(minwhen, maxwhen)
pattern = re.compile(r'^(%s[0-9-\+:T]+)\s+(.*)$' % (common))
searchfile = open(self.p, 'r')
for line in searchfile:
matched = re.match(pattern, line)
if not matched:
continue
d = iso8601.parse_date(matched.group(1))
diff = d - when
diff = abs(diff.total_seconds())
if diff >= mindiff:
continue
mindiff = diff
closest = (d, matched.group(2))
searchfile.close()
return closest
def append(self, data, dt=datetime.datetime.now().replace(tzinfo=pytz.utc)):
if os.path.isfile(self.p):
epoch = int(dt.strftime('%s'))
stat = os.stat(self.p)
if epoch < stat.st_mtime:
logging.warning('Refusing to append %s with old data' % self.p)
return
with open(self.p, 'a') as db:
db.write("%s %s\n" % (
dt.strftime(glob.ISODATE),
data
))
class DataHandler(object):
def __init__(self, request):
self.request = request
self.dt = datetime.datetime.now().replace(tzinfo=pytz.utc)
self.response = sanic.response.text('accepted',status=200)
if not 'secrets' in glob.conf or \
not 'devices' in glob.conf['secrets']:
self.response = sanic.response.text(
'server configuration error',
status=501
)
return
if 'id' not in self.request.args:
self.response = sanic.response.text(
'device id not found in request',
status=401
)
return
id = self.request.args.get('id')
if id not in glob.conf['secrets']['devices'].keys():
self.response = sanic.response.text(
'device id rejected',
status=401
)
return
self.id = glob.conf['secrets']['devices'][id]
class OpenGTSHandler(DataHandler):
def __init__(self, *args, **kwargs):
super(OpenGTSHandler, self).__init__(*args, **kwargs)
self.lat = 0
self.lon = 0
self.alt = 0
self._parse()
self.l = '%s 0' % (self.dt.strftime(glob.ISODATE))
def _parse(self):
logging.debug('--- incoming location request ---')
logging.debug(self.request.args)
if 'latitude' in self.request.args and 'longitude' in self.request.args:
self.lat = float(self.request.args.get('latitude'))
self.lon = float(self.request.args.get('longitude'))
elif 'gprmc' in self.request.args:
gprmc = pynmea2.parse(self.request.args.get('gprmc'))
try:
self.lat = float(gprmc.latitude)
self.lon = float(gprmc.longitude)
except:
self.response = sanic.response.text(
"could not process gprmc string",
status=422
)
return
else:
self.response = sanic.response.text(
"no location information found in query",
status=401
)
return
if 'exclude_coordinates' in glob.conf['secrets']:
excl = {}
for t in ['lat', 'lon']:
excl[t] = []
if t in glob.conf['secrets']['exclude_coordinates']:
for c in glob.conf['secrets']['exclude_coordinates'][t]:
excl[t].append(float(c))
if round(self.lat,2) in excl['lat'] and round(self.lon,2) in excl['lon']:
self.response = sanic.response.text(
"this location is on the excluded list",
status=200
)
return
if 'loc_timestamp' in self.request.args and 'offset' in self.request.args:
# this is a bit ugly: first convert the epoch to datetime
# then append it with the offset as string
# and convert the string back to datetime from the iso8601 string
dt = datetime.datetime.utcfromtimestamp(int(self.request.args.get('loc_timestamp')))
dt = dt.strftime('%Y-%m-%dT%H:%M:%S')
dt = "%s%s" % (dt, self.request.args.get('offset'))
try:
self.dt = iso8601.parse_date(dt).replace(tzinfo=pytz.utc)
except:
pass
if 'altitude' in self.request.args:
self.alt = float(self.request.args.get('altitude'))
else:
try:
self.alt = OpenGTSHandler.altitude_from_bing(self.lat, self.lon)
except:
pass
self.lat = "{:4.6f}".format(float(self.lat))
self.lon = "{:4.6f}".format(float(self.lon))
self.alt = "{:4.6f}".format(float(self.alt))
l = '%s %s %s' % (self.lat, self.lon, self.alt)
gpsfile = TimeSeriesHandler('location')
gpsfile.append(l, dt=self.dt)
@staticmethod
def altitude_from_bing(lat, lon):
if 'bing_key' not in glob.conf['secrets']:
return 0
if not glob.conf['secrets']['bing_key']:
return 0
url = "http://dev.virtualearth.net/REST/v1/Elevation/List?points=%s,%s&key=%s" % (
lat,
lon,
glob.conf['secrets']['bing_key']
)
bing = requests.get(url)
bing = json.loads(bing.text)
if 'resourceSets' not in bing or \
'resources' not in bing['resourceSets'][0] or \
'elevations' not in bing['resourceSets'][0]['resources'][0] or \
not bing['resourceSets'][0]['resources'][0]['elevations']:
return 0
alt = float(bing['resourceSets'][0]['resources'][0]['elevations'][0])
del(bing)
del(url)
return alt
class SensorHandler(DataHandler):
def __init__(self, *args, **kwargs):
super(SensorHandler, self).__init__(*args, **kwargs)
self.data = 0
self.tag = ''
self._parse()
def _parse(self):
logging.debug('--- incoming sensor request ---')
logging.debug(self.request.args)
for tag in self.request.args:
if tag == 'id':
continue
datafile = TimeSeriesHandler('%s-%s' % (self.id, tag))
datafile.append(self.request.args.get(tag), dt=self.dt)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
app = Sanic()
@app.route("/webmention")
async def wm(request, methods=["POST"]):
source = request.form.get('source')
target = request.form.get('target')
r = WebmentionHandler(source, target)
return r.response
@app.route("/search")
async def search(request, methods=["GET"]):
query = request.args.get('s')
r = SearchHandler(query)
return r.response
@app.route("/micropub")
async def mpub(request, methods=["POST","GET"]):
r = MicropubHandler(request)
return r.response
@app.route("/opengts")
async def opengts(request, methods=["GET"]):
r = OpenGTSHandler(request)
return r.response
@app.route("/sensor")
async def sensor(request, methods=["GET"]):
r = SensorHandler(request)
return r.response
if __name__ == "__main__":
app.run(host="127.0.0.1", port=8000, debug=True)

View file

@ -1,916 +0,0 @@
import os
import re
import sys
import collections
import logging
import glob
import img
import pypandoc
import langdetect
from cache import Cached
from slugify import slugify
from ruamel import yaml
from bs4 import BeautifulSoup
import frontmatter
from webmentiondb import WebmentionDB
import arrow
import json
import socket
import requests
import hashlib
import shutil
class SingularHandler(object):
def __init__(self, fpath, pingdb=WebmentionDB(), category='note'):
self.fpath = os.path.abspath(fpath)
path, fname = os.path.split(self.fpath)
fname, ext = os.path.splitext(fname)
self.fname = fname
self.fext = ext
self.ftime = os.stat(self.fpath)
self.target = os.path.join(glob.TARGET, "%s.html" % (self.fname))
basedir = os.path.join(glob.TARGET, "%s" % (self.fname))
if not os.path.isdir(basedir):
os.mkdir(basedir)
self.saved = os.path.join(glob.TARGET, "%s" % (self.fname), "saved.html")
self.pingdb = pingdb
self.title = ''
self.content = ''
self._content = ''
self.summary = ''
self.html = ''
self.sumhtml = ''
self.category = category
self.tags = []
self.reactions = {}
#self.date = datetime.datetime(1970, 1, 1).replace(tzinfo=pytz.utc)
self.date = arrow.get(0)
self.updated = None
self.dtime = 0
self.utime = 0
self.redirect = {}
self.exifmin = {}
self.lang = glob.conf['site']['lang']
self.syndicate = {}
self.syndications = []
self.template = 'singular.html'
self.slug = slugify(self.fname, only_ascii=True, lower=True)
self.shortslug = slugify(self.fname, only_ascii=True, lower=True)
self.img = None
self.srcset = ''
def __repr__(self):
return "Post '%s' (%s), category: %s" % (self.title,self.fname,self.category)
def _postsetup(self):
""" Shared post-setup - the initial thing, such at title, should be
set by the classes inheriting this one; these are only the common,
shared variables """
# set published epoch
#self.dtime = calendar.timegm(self.date.timetuple())
self.dtime = self.date.timestamp
# set updated epoch, if any and set the original file date according
# to either the updated or the published time
if self.updated:
#self.utime = calendar.timegm(self.updated.timetuple())
self.utime = self.updated.timestamp
if self.utime > 0 and self.utime != self.ftime.st_mtime:
os.utime(self.fpath, (self.utime, self.utime))
elif self.dtime > 0 and self.dtime != self.ftime.st_mtime:
os.utime(self.fpath, (self.dtime, self.dtime))
# generate shortslug from dtime if possible
if self.dtime > 0:
self.shortslug = SingularHandler.baseN(self.dtime)
self.redirect[self.shortslug] = 1
# detect post content language if possible
try:
self.lang = langdetect.detect("%s\n\n%s" % (self.title, self.content))
except:
pass
# make HTML from markdown via pandoc for the content and the summary
self.html = SingularHandler.pandoc_md2html(
self.content,
time=self.ftime
)
self.sumhtml = SingularHandler.pandoc_md2html(
self.summary,
time=self.ftime
)
self.url = "%s/%s" % (glob.conf['site']['url'], self.slug)
self.syndications = self.pingdb.posses(self.url)
#def urlsvg(self):
# import pyqrcode
# import tempfile
## generate qr code to the url
#qrname = tempfile.NamedTemporaryFile(prefix='pyqr_')
#qr = pyqrcode.create(self.url, error='L')
#qr.svg(
#qrname.name,
#xmldecl=False,
#omithw=True,
#scale=1,
#quiet_zone=0,
#svgclass='qr',
#lineclass='qrline'
#)
#with open(qrname.name) as f:
#qrsvg = f.read()
#f.close()
#return qrsvg
@staticmethod
def pandoc_md2html(t, time=None):
if len(t) == 0:
return t
cached = Cached(text="%s" % t, stime=time)
c = cached.get()
if c:
return c
else:
extras = [
'backtick_code_blocks',
'auto_identifiers',
'fenced_code_attributes',
'definition_lists',
'grid_tables',
'pipe_tables',
'strikeout',
'superscript',
'subscript',
'markdown_in_html_blocks',
'shortcut_reference_links',
'autolink_bare_uris',
'raw_html',
'link_attributes',
'header_attributes',
'footnotes',
]
md = "markdown+" + "+".join(extras)
t = pypandoc.convert_text(t, to='html5', format=md)
cached.set(t)
return t
@staticmethod
def pandoc_html2md(t, time=None):
if len(t) == 0:
return t
cached = Cached(text="%s" % t, stime=time)
c = cached.get()
if c:
return c
else:
t = pypandoc.convert_text(
t,
to="markdown-" + "-".join([
'raw_html',
'native_divs',
'native_spans',
]),
format='html'
)
cached.set(t)
return t
def tmpl(self):
return {
'title': self.title,
'published': self.date,
'tags': self.tags,
'author': glob.conf['author'],
'content': self.content,
'html': self.html,
'category': self.category,
'reactions': self.reactions,
'updated': self.updated,
'summary': self.sumhtml,
'exif': self.exifmin,
'lang': self.lang,
'syndicate': self.syndicate,
'slug': self.slug,
'shortslug': self.shortslug,
'srcset': self.srcset,
}
@staticmethod
def write_redirect(sslug, target, tstamp=arrow.utcnow().timestamp):
tmpl = glob.jinja2env.get_template('redirect.html')
jvars = {
'url': target
}
r = tmpl.render(jvars)
# this is to support / ending urls even for the redirects
dirs = [
os.path.join(glob.TARGET, sslug)
]
for d in dirs:
if not os.path.exists(d):
os.mkdir(d)
files = [
os.path.join(glob.TARGET, "%s.html" % (sslug)),
os.path.join(glob.TARGET, sslug, "index.html")
]
for f in files:
if os.path.isfile(f):
rtime = os.stat(f)
if tstamp == rtime.st_mtime:
logging.debug(
"Unchanged dates on redirect file %s", f
)
continue
with open(f, "w") as html:
logging.info("writing redirect file %s", f)
html.write(r)
html.close()
os.utime(f, (tstamp,tstamp))
def redirects(self):
""" Write redirect HTMLs """
if self.category == 'page':
return
for sslug in self.redirect.keys():
SingularHandler.write_redirect(sslug, self.url, self.ftime.st_mtime)
def write(self):
""" Write HTML file """
if os.path.isfile(self.target):
ttime = os.stat(self.target)
if self.ftime.st_mtime == ttime.st_mtime and not glob.FORCEWRITE:
logging.debug(
"Unchanged dates on %s; skipping rendering and writing",
self.fname
)
return
tmpl = glob.jinja2env.get_template(self.template)
logging.info("rendering %s", self.fname)
tmplvars = {
'post': self.tmpl(),
'site': glob.conf['site'],
'taxonomy': {},
}
r = tmpl.render(tmplvars)
soup = BeautifulSoup(r,"html5lib")
r = soup.prettify()
targets = [self.target]
for target in targets:
with open(target, "w") as html:
logging.info("writing %s", target)
html.write(r)
html.close()
os.utime(target, (self.ftime.st_mtime, self.ftime.st_mtime))
rdir = os.path.join(glob.TARGET, self.slug)
if not os.path.isdir(rdir):
os.mkdir(rdir)
altdst = os.path.join(glob.TARGET, self.slug, 'index.html')
altsrc = os.path.join('..', self.target)
if not os.path.islink(altdst):
if os.path.isfile(altdst):
os.unlink(altdst)
os.symlink(altsrc, altdst)
#links = []
#for r in self.reactions.items():
#reactiontype, urls = r
#if isinstance(urls, str):
#links.append(urls)
#elif isinstance(urls, list):
#links = [*links, *urls]
#if 1 == len(links):
#saved = os.path.join(glob.TARGET, self.slug, 'saved.html')
#if not os.path.isfile(saved):
#h, p = _localcopy_hashpath(links[0])
#c = self._get_localcopy(links[0], h, p)
#with open(saved, 'w') as f:
#f.write(c)
#f.close()
def index(self, ix):
""" Write search index """
writer = ix.writer()
c = "%s %s %s %s %s" % (
self.slug,
self.summary,
self._content,
yaml.dump(self.reactions, Dumper=yaml.RoundTripDumper),
yaml.dump(self.exifmin, Dumper=yaml.RoundTripDumper)
)
c = "%s %s" % (c, self._localcopy_include())
if self.img:
imgstr = self.img.mksrcset(generate_caption=False)
else:
imgstr = ''
writer.add_document(
title=self.title,
url=self.url,
content=c,
date=self.date.datetime,
tags=",".join(self.tags),
weight=1,
img=imgstr
)
writer.commit()
def pings(self):
""" Ping (webmention) all URLs found in the post """
links = []
urlregex = re.compile(
r'\s+https?\:\/\/?[a-zA-Z0-9\.\/\?\:@\-_=#]+'
r'\.[a-zA-Z0-9\.\/\?\:@\-_=#]*'
)
matches = re.findall(urlregex, self.content)
for r in self.reactions.items():
reactiontype, urls = r
if isinstance(urls, str):
matches.append(urls)
elif isinstance(urls, list):
matches = [*matches, *urls]
#for s in self.syndicate.keys():
#matches.append('https://brid.gy/publish/%s' % (s))
if self.utime and self.utime > 0:
time = self.utime
else:
time = self.dtime
if len(matches) > 0:
for link in matches:
if glob.conf['site']['domain'] in link:
continue
if link in links:
continue
#self._localcopy(link)
self.pingdb.ping(self.url, link, time)
links.append(link)
def _localcopy_hashpath(self,url):
h = hashlib.md5(url.encode('utf-8')).hexdigest()
p = os.path.join(glob.LOCALCOPIES, "%s.html" % (h))
return (h, p)
def _localcopy_include(self):
links = []
md = ''
for r in self.reactions.items():
reactiontype, urls = r
if isinstance(urls, str):
links.append(urls)
elif isinstance(urls, list):
links = [*links, *urls]
for url in links:
h, p = self._localcopy_hashpath(url)
html = self._get_localcopy(url, h, p)
md = "%s %s" % (
md,
SingularHandler.pandoc_html2md(html, os.stat(p))
)
return md
def _get_localcopy(self, url, h, p):
html = ''
if os.path.isfile(p):
with open(p, 'r') as f:
html = f.read()
f.close()
else:
html = self._make_localcopy(url, h, p)
return html
def _make_localcopy(self, url, h, p):
post = self._pull_localcopy(url)
tmpl = glob.jinja2env.get_template('localcopy.html')
html = tmpl.render({'post': post})
soup = BeautifulSoup(html,"html5lib")
html = soup.prettify()
with open(p, "w") as f:
logging.info("saving readable copy of %s to %s", url, p)
f.write(html)
f.close()
return html
def _pull_localcopy(self, url):
# find the true URL
# MAYBE: add fallback to archive.org?
realurl = url
try:
pretest = requests.head(url, allow_redirects=True, timeout=30)
realurl = pretest.url
except:
pass
parsed = {
'lang': 'en',
'url': url,
'realurl': realurl,
'html': '',
'title': '',
'excerpt': '',
'byline': '',
}
if 'readable' in glob.conf and \
'port' not in glob.conf['readable'] and \
'host' not in glob.conf['readable']:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socktest = sock.connect_ex((
glob.conf['readable']['host'], int(glob.conf['readable']['port'])
))
if 0 == socktest:
text = self._localcopy_via_proxy(realurl)
parsed['html'] = text.get('content','')
parsed['title'] = text.get('title',url)
parsed['excerpt'] = text.get('excerpt', '')
parsed['byline'] = text.get('byline', '')
try:
parsed['lang'] = langdetect.detect(parsed['html'])
except:
pass
return parsed
# TODO: fallback to full-python solution if the previous failed
return parsed
def _localcopy_via_proxy(self, url):
r = "http://%s:%s/api/get?url=%s&sanitize=y" % (
glob.conf['readable']['host'],
glob.conf['readable']['port'],
url
)
try:
req = requests.get(r,allow_redirects=False,timeout=60);
except:
return None
text = {}
try:
text = json.loads(req.text)
except:
pass
return text
def _adaptify(self):
""" Generate srcset for all images possible """
linkto = False
isrepost = None
if len(self.reactions.keys()):
isrepost = list(self.reactions.keys())[0]
if isrepost:
if len(self.reactions[isrepost]) == 1:
linkto = self.reactions[isrepost][0]
mdmatch = re.compile(
r'!\[.*\]\(.*?\.(?:jpe?g|png|gif)'
r'(?:\s+[\'\"]?.*?[\'\"]?)?\)(?:\{.*?\})?'
)
mdsplit = re.compile(
r'!\[(.*)\]\((?:\/(?:files|cache)'
r'(?:\/[0-9]{4}\/[0-9]{2})?\/(.*\.(?:jpe?g|png|gif)))'
r'(?:\s+[\'\"]?(.*?)[\'\"]?)?\)(?:\{(.*?)\})?'
)
mdimg = re.findall(mdmatch, self.content)
for i in mdimg:
m = re.match(mdsplit, i)
if m:
#logging.info(m.groups())
imgpath = os.path.join(glob.SFILES, m.group(2))
if not os.path.isfile(imgpath):
for c in glob.conf['category'].items():
catn, catd = c
catp = os.path.abspath(os.path.join(glob.CONTENT, catn))
if not os.path.exists(catp) \
or not 'type' in catd \
or catd['type'] != 'photo':
continue
imgpath = os.path.join(catp, m.group(2))
break
if os.path.isfile(imgpath):
t = ''
if m.group(3):
t = m.group(3)
cl = ''
if m.group(4):
cl = m.group(4)
a = ''
if m.group(1):
a = m.group(1)
im = img.ImageHandler(
imgpath,
alttext=a,
title=t,
imgcl=cl,
linkto=linkto
)
im.downsize()
logging.debug("replacing image %s with srcset", imgpath)
srcset = im.mksrcset()
if srcset:
self.content = self.content.replace(i, srcset)
del(im)
else:
logging.error("%s missing %s", m.group(2), self.fpath)
def _video(self):
""" [video] shortcode extractor """
match = re.compile(r'\[video mp4=\"/(?:files|cache).*?\"\]\[/video\]')
split = re.compile(r'\[video mp4=\"(/(?:files|cache)\/(.*?))\"\]\[/video\]')
videos = re.findall(match, self.content)
for vid in videos:
v = re.match(split, vid)
video = """
<video controls>
<source src="%s" type="video/mp4">
Your browser does not support the video tag.
</video>""" % (v.group(1))
self.content = self.content.replace(vid, video)
#def _files(self):
#""" Copy misc files referenced """
#match = re.compile(
#r'\s(?:%s)?/(?:files|cache)'
#r'/.*\.(?:(?!jpe?g|png|gif).*)\s' % (glob.conf['site']['domain'])
#)
#split = re.compile(
#r'\s(?:%s)?/((?:files|cache)'
#r'/(.*\.(?:(?!jpe?g|png|gif).*)))\s' % (glob.conf['site']['domain'])
#)
##files = re.findall(match, self.content)
##print(files)
def _snippets(self):
""" Replaces [git:(repo)/(file.ext)] with corresponding code snippet """
snmatch = re.compile(r'\[git:[^\/]+\/(?:.*\..*)\]')
snsplit = re.compile(r'\[git:([^\/]+)\/((?:.*)\.(.*))\]')
snippets = re.findall(snmatch, self.content)
isconf = re.compile(r'conf', re.IGNORECASE)
for snippet in snippets:
sn = re.match(snsplit, snippet)
if sn:
fpath = os.path.join(glob.SOURCE, sn.group(1), sn.group(2))
if not os.path.isfile(fpath):
logging.error(
"missing blogsnippet in %s: %s",
self.fpath,
fpath
)
continue
if re.match(isconf, sn.group(3)):
lang = 'apache'
else:
lang = sn.group(3)
with open(fpath, "r") as snip:
c = snip.read()
snip.close
c = "\n\n```%s\n%s\n```\n" % (lang, c)
logging.debug("replacing blogsnippet %s", fpath)
self.content = self.content.replace(snippet, c)
@staticmethod
def baseN(num, b=36, numerals="0123456789abcdefghijklmnopqrstuvwxyz"):
""" Used to create short, lowecase slug for a number (an epoch) passed """
num = int(num)
return ((num == 0) and numerals[0]) or (
SingularHandler.baseN(
num // b,
b,
numerals
).lstrip(numerals[0]) + numerals[num % b]
)
class ArticleHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(ArticleHandler, self).__init__(*args, **kwargs)
self.dctype = 'Text'
self._setup()
def _setup(self):
post = frontmatter.load(self.fpath)
self.meta = post.metadata
self.content = post.content
self._content = '%s' % (self.content)
if 'tags' in post.metadata:
self.tags = post.metadata['tags']
if 'title' in post.metadata:
self.title = post.metadata['title']
if 'published' in post.metadata:
self.date = arrow.get(post.metadata['published'])
if 'updated' in post.metadata:
self.updated = arrow.get(post.metadata['updated'])
if 'summary' in post.metadata:
self.summary = post.metadata['summary']
if 'redirect' in post.metadata and \
isinstance(post.metadata['redirect'], list):
for r in post.metadata['redirect']:
self.redirect[r] = 1
if 'syndicate' in post.metadata:
z = post.metadata['syndicate']
if isinstance(z, str):
self.syndicate[z] = ''
elif isinstance(z, dict):
for s, c in z.items():
self.syndicate[s] = c
elif isinstance(z, list):
for s in z:
self.syndicate[s] = ''
self.reactions = {}
# getting rid of '-' to avoid css trouble and similar
rmap = {
'bookmark-of': 'bookmark',
'repost-of': 'repost',
'in-reply-to': 'reply',
}
for x in rmap.items():
key, replace = x
if key in self.meta:
if isinstance(self.meta[key], str):
self.reactions[replace] = [self.meta[key]]
elif isinstance(self.meta[key], list):
self.reactions[replace] = self.meta[key]
self._adaptify()
self._snippets()
self._video()
#self._files()
super(ArticleHandler, self)._postsetup()
class PhotoHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(PhotoHandler, self).__init__(*args, **kwargs)
self.dctype = 'Image'
self.img = img.ImageHandler(self.fpath)
self.exif = self.img.exif
self._setup()
def _setup(self):
self.syndicate = {
'flickr': '',
}
keywords = [
'XMP:Keywords',
'IPTC:Keywords'
]
tags = {}
for key in keywords:
if key in self.exif and self.exif[key]:
if isinstance(self.exif[key], str):
self.exif[key] = self.exif[key].split(",")
if isinstance(self.exif[key], list):
for tag in self.exif[key]:
tags[str(tag).strip()] = 1
self.tags = list(tags.keys())
# content
keywords = [
'XMP:Description',
'IPTC:Caption-Abstract'
]
for key in keywords:
if key in self.exif and self.exif[key]:
self.content = self.exif[key]
break
self._content = '%s' % (self.content)
# title
keywords = [
'XMP:Title',
'XMP:Headline',
'IPTC:Headline'
]
for key in keywords:
if key in self.exif and self.exif[key]:
self.title = self.exif[key]
break
# datetime
keywords = [
'XMP:DateTimeDigitized',
'XMP:CreateDate',
'EXIF:CreateDate',
'EXIF:ModifyDate'
]
pattern = re.compile(
"(?P<Y>[0-9]{4}):(?P<M>[0-9]{2}):(?P<D>[0-9]{2})\s+"
"(?P<T>[0-9]{2}:[0-9]{2}:[0-9]{2})Z?"
)
for key in keywords:
if key not in self.exif or not self.exif[key]:
continue
date = None
v = pattern.match(self.exif[key]).groupdict()
if not v:
continue
try:
date = arrow.get('%s-%s-%s %s' % (v['Y'], v['M'], v['D'], v['T']))
except:
continue
if date:
self.date = date
logging.debug("date for %s is set to %s from key %s", self.fname, self.date, key)
break
self.img.title = self.title
self.img.alttext = self.content
self.content = self.content + "\n\n" + self.img.mksrcset(generate_caption=False, uphoto=True)
self.img.downsize()
self.srcset = self.img.mksrcset(generate_caption=False, uphoto=False)
super(PhotoHandler, self)._postsetup()
def tmpl(self):
tmpl = super(PhotoHandler, self).tmpl()
tmpl['exif'] = {}
mapping = {
'camera': [
'EXIF:Model'
],
'aperture': [
'EXIF:FNumber',
'Composite:Aperture'
],
'shutter_speed': [
'EXIF:ExposureTime'
],
'focallength': [
'EXIF:FocalLength',
'Composite:FocalLength35efl',
],
'iso': [
'EXIF:ISO'
],
'lens': [
'Composite:LensID',
'MakerNotes:Lens',
'Composite:LensSpec'
]
}
for ekey, candidates in mapping.items():
for candidate in candidates:
if candidate in self.exif:
tmpl['exif'][ekey] = self.exif[candidate]
break
gps = ['Latitude', 'Longitude']
for g in gps:
gk = 'EXIF:GPS%s' % (g)
if gk not in self.exif:
continue
r = 'EXIF:GPS%sRef' % (g)
ref = None
if r in self.exif:
ref = self.exif[r]
tmpl['exif']['geo_%s' % (g.lower())] = self.gps2dec(
self.exif[gk],
ref
)
##tmpl['imgurl'] = ''
#sizes = collections.OrderedDict(reversed(list(self.img.sizes.items())))
#for size, meta in sizes.items():
#if os.path.isfile(meta['path']):
#with Image.open(meta['path']) as im:
#meta['width'], meta['height'] = im.size
#meta['size'] = os.path.getsize(meta['path'])
#tmpl['img'] = meta
#break
tmpl['img'] = self.img.meta
return tmpl
@staticmethod
def gps2dec(exifgps, ref=None):
pattern = re.compile(r"(?P<deg>[0-9.]+)\s+deg\s+(?P<min>[0-9.]+)'\s+(?P<sec>[0-9.]+)\"(?:\s+(?P<dir>[NEWS]))?")
v = pattern.match(exifgps).groupdict()
dd = float(v['deg']) + (((float(v['min']) * 60) + (float(v['sec']))) / 3600)
if ref == 'West' or ref == 'South' or v['dir'] == "S" or v['dir'] == "W":
dd = dd * -1
return round(dd, 6)
class PageHandler(SingularHandler):
def __init__(self, *args, **kwargs):
super(PageHandler, self).__init__(*args, **kwargs)
self._setup()
def _setup(self):
with open(self.fpath) as c:
self.content = c.read()
c.close()
self._content = '%s' % (self.content)
self._adaptify()
super(PageHandler, self)._postsetup()
self.template = 'page.html'

View file

@ -1,253 +0,0 @@
import math
import logging
import os
import collections
import json
import glob
from slugify import slugify
from bs4 import BeautifulSoup
from pprint import pprint
class TaxonomyHandler(object):
def __init__(self, taxonomy='', name='', description='', exclude=False):
self.taxonomy = taxonomy
self.name = name
self.description = description
self.exclude = exclude
self.slug = slugify(self.name, only_ascii=True, lower=True)
self.posts = collections.OrderedDict()
self.taxp = os.path.join(glob.TARGET, self.taxonomy)
self.simplepath = os.path.join(self.taxp, 'index.html')
self.basep = os.path.join(self.taxp, self.slug)
self.pagedp = os.path.join(self.basep, 'page')
self.indexpath = os.path.join(self.basep, 'index.html')
self.lptime = 0
def __getitem__(self, key):
return self.posts[key]
def __repr__(self):
return 'Taxonomy %s (name: %s, slug: %s) with %i posts' % (
self.taxonomy,
self.name,
self.slug,
len(self.posts)
)
def __next__(self):
try:
r = self.posts.next()
except:
raise StopIteration()
return r
def __iter__(self):
for ix, post in self.posts.items():
yield post
return
def append(self, post):
k = int(post.date.timestamp)
if k in self.posts:
inc = 1
while k in self.posts:
k = int(k+1)
self.posts[k] = post
self.posts = collections.OrderedDict(sorted(self.posts.items(), reverse=True))
def index(self, ix):
""" Write search index """
writer = ix.writer()
t, lp = list(self.posts.items())[0]
writer.add_document(
title=self.name,
url="%s/%s/%s" % (glob.conf['site']['url'], self.taxonomy, self.slug),
content="%s %s" % (self.name, self.slug),
date=lp.date.datetime,
tags=",".join([self.name]),
weight=10
)
writer.commit()
def _test_freshness(self):
t, lp = list(self.posts.items())[0]
self.lptime = lp.ftime.st_mtime
if os.path.isfile(self.indexpath):
p = self.indexpath
elif os.path.isfile(self.simplepath):
p = self.simplepath
else:
return False
itime = os.stat(p)
if itime.st_mtime == self.lptime and not glob.FORCEWRITE:
logging.debug(
'Taxonomy tree is fresh for %s' % (self.name)
)
return True
return False
def _test_dirs(self):
if not os.path.isdir(self.taxp):
os.mkdir(self.taxp)
if not os.path.isdir(self.basep):
os.mkdir(self.basep)
def write_paginated(self):
if self._test_freshness():
return
self._test_dirs()
taxp = os.path.join(glob.TARGET, self.taxonomy)
basep = os.path.join(glob.TARGET, self.taxonomy, self.slug)
if not os.path.isdir(taxp):
os.mkdir(taxp)
if not os.path.isdir(basep):
os.mkdir(basep)
pages = math.ceil(len(self.posts) / glob.conf['perpage'])
page = 1
if len(self.taxonomy) and len(self.slug):
base_url = "/%s/%s/" % (self.taxonomy, self.slug)
else:
base_url = '/'
while page <= pages:
start = int((page-1) * int(glob.conf['perpage']))
end = int(start + int(glob.conf['perpage']))
dorss = False
posttmpls = [self.posts[k].tmpl() for k in list(sorted(
self.posts.keys(), reverse=True))[start:end]]
if page == 1:
tpath = self.indexpath
do_rss = True
# RSS
else:
do_rss = False
if not os.path.isdir(self.pagedp):
os.mkdir(self.pagedp)
tdir = os.path.join(self.pagedp, "%d" % page)
if not os.path.isdir(tdir):
os.mkdir(tdir)
tpath = os.path.join(tdir, "index.html")
tvars = {
'taxonomy': {
'url': base_url,
'name': self.name,
'taxonomy': self.taxonomy,
'description': self.description,
'paged': page,
'total': pages,
'perpage': glob.conf['perpage'],
},
'site': glob.conf['site'],
'posts': posttmpls,
}
tmpl = glob.jinja2env.get_template('archive.html')
logging.info("rendering %s" % (tpath))
with open(tpath, "w") as html:
r = tmpl.render(tvars)
soup = BeautifulSoup(r, "html5lib")
r = soup.prettify()
logging.info("writing %s" % (tpath))
html.write(r)
html.close()
os.utime(tpath, (self.lptime, self.lptime))
if do_rss:
feeddir = os.path.join(self.basep, 'feed')
if not os.path.isdir(feeddir):
os.mkdir(feeddir)
feedpath = os.path.join(feeddir, "index.xml")
tmpl = glob.jinja2env.get_template('rss.html')
logging.info("rendering %s" % (feedpath))
with open(feedpath, "w") as html:
r = tmpl.render(tvars)
logging.info("writing %s" % (feedpath))
html.write(r)
html.close()
os.utime(feedpath, (self.lptime, self.lptime))
page = page+1
def write_simple(self, template='archive.html'):
if self._test_freshness():
return
self._test_dirs()
base_url = "/%s/" % (self.slug)
posttmpls = [self.posts[k].tmpl() for k in list(sorted(
self.posts.keys(), reverse=True))]
tvars = {
'taxonomy': {
'url': base_url,
'name': self.name,
'taxonomy': self.taxonomy,
'description': self.description,
'paged': 0,
'total': 0,
'perpage': glob.conf['perpage'],
},
'site': glob.conf['site'],
'posts': posttmpls,
}
with open(os.path.join(self.simplepath), "w") as html:
html.write(json.dumps(tvars, indent=4, sort_keys=True, default=str))
html.close()
#tmpl = glob.jinja2env.get_template('gallery.html')
#logging.info("rendering %s" % (indexpath))
#with open(indexpath, "w") as html:
#r = tmpl.render(tvars)
#soup = BeautifulSoup(r, "html5lib")
#r = soup.prettify()
#logging.info("writing %s" % (indexpath))
#html.write(r)
#html.close()
#os.utime(indexpath, (lptime, lptime))
def writesitemap(self):
sitemap = "%s/sitemap.txt" % (glob.TARGET)
urls = []
for p in self.posts.items():
t, data = p
urls.append( "%s/%s" % ( glob.conf['site']['url'], data.slug ) )
with open(sitemap, "w") as f:
logging.info("writing %s" % (sitemap))
f.write("\n".join(urls))
f.close()

View file

@ -1,20 +0,0 @@
#!/usr/bin/env bash
if [ -f "/tmp/petermolnar.net.generator.lock" ]; then
exit 0;
fi;
lastfile="$(find /home/petermolnar.net/source/ -type f -name *.md -printf '%T+ %p\n' | sort | tail -n1 | awk '{print $2}')";
lastfilemod=$(stat -c %Y "$lastfile");
lastrunfile="/tmp/generator_last_run";
lastrun=0;
if [ -f "$lastrunfile" ]; then
lastrun=$(stat -c %Y "$lastrunfile");
fi;
if [ "$lastrun" -lt "$lastfilemod" ]; then
cd /home/petermolnar.net/src; ../.venv/bin/python3.5 generator.py;
fi;
exit 0;

View file

@ -1,103 +0,0 @@
import os
import hashlib
import logging
import glob
from webmentiontools.send import WebmentionSend
import requests
import json
class WebmentionDB(object):
dbpath = glob.WEBMENTIONDB
def __init__(self):
self.sent = {}
self._loaddb()
def _loaddb(self):
if os.path.isfile(self.dbpath):
logging.info("loading pinged database")
with open(self.dbpath, 'r') as db:
self.sent = json.loads(db.read())
def _dumpdb(self):
with open(self.dbpath, "w") as db:
logging.info("writing pinged database")
db.write(json.dumps(self.sent, indent=4, sort_keys=True))
db.close()
def _refreshdb(self):
self._dumpdb()
self._loaddb()
def __getitem__(self, key):
r = {}
for i in self.sent.items():
h, data = i
if data['source'] == key:
r[data['target']] = {
'time': data['time'],
'response': data['response']
}
return r
def __len__(self):
return len(self.sent)
def posses(self, key):
r = []
for i in self.sent.items():
h, data = i
if data['source'] != key:
continue
if not len(data['response']):
continue
if 'url' not in data['response']:
continue
r.append(data['response']['url'])
return r
def ping(self, source, target, time=0, posse=False):
resp = {}
source = source.strip()
target = target.strip()
h = source + target + "%i" % (int(time))
h = h.encode('utf-8')
h = hashlib.sha1(h).hexdigest()
if h in self.sent.keys():
logging.debug("already pinged: %s" % (target))
return True
logging.debug("pinging: %s" % (target))
wm = WebmentionSend(source, target)
if hasattr(wm, 'response'):
resp = wm.response
# fire and forget archive.org call
try:
verify = requests.get(
'%s%s' % ('https://web.archive.org/save/', target),
allow_redirects=False,
timeout=30,
)
except:
pass
self.sent[h] = {
'source': source,
'target': target,
'time': time,
'response': resp
}
self._refreshdb()