import configparser import os import re import glob import logging import subprocess import json import sqlite3 import requests from slugify import slugify import jinja2 class CMDLine(object): def __init__(self, executable): self.executable = self._which(executable) if self.executable is None: raise OSError('No %s found in PATH!' % executable) return @staticmethod def _which(name): for d in os.environ['PATH'].split(':'): which = glob.glob(os.path.join(d, name), recursive=True) if which: return which.pop() return None class XRay(CMDLine): cmd_prefix = 'chdir("/usr/local/lib/php/xray"); include("vendor/autoload.php"); $xray = new p3k\XRay();' def __init__(self, url): super().__init__('php') self.url = url self.target = '' self.cmd = ( self.executable, '-r', '%s; echo(json_encode($xray->parse("%s")));' % ( self.cmd_prefix, self.url ) ) def set_receive(self, target): self.cmd = ( self.executable, '-r', '%s; echo(json_encode($xray->parse("%s")));' % ( self.cmd_prefix, self.url, target ) ) return self def set_discover(self): self.cmd = ( self.executable, '-r', '%s; echo(json_encode($xray->rels("%s")));' % ( self.cmd_prefix, self.url, ) ) return self def parse(self): logging.debug('pulling %s with XRay', self.url) p = subprocess.Popen( self.cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = p.communicate() if stderr: logging.error("Error with XRay: %s", stderr) return json.loads(stdout.decode('utf-8').strip()) class Pandoc(CMDLine): """ Pandoc command line call with piped in- and output """ def __init__(self, md2html=True): super().__init__('pandoc') if True == md2html: self.i = "markdown+" + "+".join([ 'backtick_code_blocks', 'auto_identifiers', 'fenced_code_attributes', 'definition_lists', 'grid_tables', 'pipe_tables', 'strikeout', 'superscript', 'subscript', 'markdown_in_html_blocks', 'shortcut_reference_links', 'autolink_bare_uris', 'raw_html', 'link_attributes', 'header_attributes', 'footnotes', ]) self.o = 'html5' elif 'plain' == md2html: self.i = "markdown+" + "+".join([ 'backtick_code_blocks', 'auto_identifiers', 'fenced_code_attributes', 'definition_lists', 'grid_tables', 'pipe_tables', 'strikeout', 'superscript', 'subscript', 'markdown_in_html_blocks', 'shortcut_reference_links', 'autolink_bare_uris', 'raw_html', 'link_attributes', 'header_attributes', 'footnotes', ]) self.o = "plain" else: self.o = "markdown-" + "-".join([ 'raw_html', 'native_divs', 'native_spans', ]) self.i = 'html' def convert(self, text): cmd = ( self.executable, '-o-', '--from=%s' % self.i, '--to=%s' % self.o ) logging.debug('converting string with Pandoc') p = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = p.communicate(input=text.encode()) if stderr: logging.error( "Error during pandoc covert:\n\t%s\n\t%s", cmd, stderr ) return stdout.decode('utf-8').strip() class ExifTool(CMDLine): def __init__(self, fpath): self.fpath = fpath super().__init__('exiftool') @staticmethod def exifdate2iso(value): """ converts and EXIF date string to ISO 8601 format :param value: EXIF date (2016:05:01 00:08:24) :type arg1: str :return: ISO 8601 string with UTC timezone 2016-05-01T00:08:24+0000 :rtype: str """ if not isinstance(value, str): return value match = REGEX['exifdate'].match(value) if not match: return value return "%s-%s-%sT%s+0000" % ( match.group('year'), match.group('month'), match.group('day'), match.group('time') ) def read(self): cmd = ( self.executable, '-sort', '-json', '-MIMEType', '-FileType', '-FileName', '-ModifyDate', '-CreateDate', '-DateTimeOriginal', '-ImageHeight', '-ImageWidth', '-Aperture', '-FOV', '-ISO', '-FocalLength', '-FNumber', '-FocalLengthIn35mmFormat', '-ExposureTime', '-Copyright', '-Artist', '-Model', '-GPSLongitude#', '-GPSLatitude#', '-LensID', '-LensSpec', '-Lens', '-ReleaseDate', '-Description', '-Headline', '-HierarchicalSubject', self.fpath ) logging.debug('reading EXIF from %s', self.fpath) p = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = p.communicate() if stderr: logging.error("Error reading EXIF:\n\t%s\n\t%s", cmd, stderr) exif = json.loads(stdout.decode('utf-8').strip()).pop() if 'ReleaseDate' in exif and 'ReleaseTime' in exif: exif['DateTimeRelease'] = "%s %s" % (exif.get('ReleaseDate'), exif.get('ReleaseTime')[:8]) del(exif['ReleaseDate']) del(exif['ReleaseTime']) for k, v in exif.items(): exif[k] = self.exifdate2iso(v) return exif class TokenDB(object): def __init__(self, uuid='tokens'): self.db = config.get('var', 'tokendb') self.tokens = {} self.refresh() def refresh(self): self.tokens = {} if os.path.isfile(self.db): with open(self.db, 'rt') as f: self.tokens = json.loads(f.read()) def save(self): with open(self.db, 'wt') as f: f.write(json.dumps( self.tokens, indent=4, sort_keys=True )) def get_token(self, token): return self.tokens.get(token, None) def get_service(self, service): token = self.tokens.get(service, None) return token def set_service(self, service, tokenid): self.tokens.update({ service: tokenid }) self.save() def update_token(self, token, oauth_token_secret=None, access_token=None, access_token_secret=None, verifier=None): t = self.tokens.get(token, {}) if oauth_token_secret: t.update({ 'oauth_token_secret': oauth_token_secret }) if access_token: t.update({ 'access_token': access_token }) if access_token_secret: t.update({ 'access_token_secret': access_token_secret }) if verifier: t.update({ 'verifier': verifier }) self.tokens.update({ token: t }) self.save() def clear(self): self.tokens = {} self.save() def clear_service(self, service): t = self.tokens.get(service) if t: del(self.tokens[t]) del(self.tokens[service]) self.save() class SearchDB(object): tmplfile = 'Search.html' def __init__(self): self.db = sqlite3.connect( "%s" % config.get('var', 'searchdb') ) cursor = self.db.cursor() cursor.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS data USING FTS5( id, corpus, mtime, url, category, title )''') self.db.commit() def __exit__(self): self.finish() def finish(self): self.db.close() def append(self, id, corpus, mtime, url, category, title): mtime = int(mtime) logging.debug("adding %s to searchdb", id) cursor = self.db.cursor() cursor.execute('''DELETE FROM data WHERE id=?''', (id,)) cursor.execute('''INSERT OR IGNORE INTO data (id, corpus, mtime, url, category, title) VALUES (?,?,?,?,?,?);''', ( id, corpus, mtime, url, category, title )) self.db.commit() def is_uptodate(self, fname, mtime): mtime = int(mtime) ret = {} cursor = self.db.cursor() cursor.execute('''SELECT mtime FROM data WHERE id = ? AND mtime = ?''', (fname,mtime) ) rows = cursor.fetchall() if len(rows): logging.debug("%s is up to date in searchdb", fname) return True logging.debug("%s is out of date in searchdb", fname) return False def search_by_query(self, query): ret = {} cursor = self.db.cursor() cursor.execute('''SELECT id, category, url, title, snippet(data, 1, '', '', '[...]', 24) FROM data WHERE data MATCH ? ORDER BY category, rank;''', (query,)) rows = cursor.fetchall() for r in rows: r = { 'id': r[0], 'category': r[1], 'url': r[2], 'title': r[3], 'txt': r[4], } category = r.get('category') if category not in ret: ret.update({category: {}}) maybe_fpath = os.path.join( config.get('dirs', 'content'), category, "%s.*" % r.get('id') ) #fpath = glob.glob(maybe_fpath).pop() ret.get(category).update({ r.get('id'): { #'fpath': fpath, 'url': r.get('url'), 'title': r.get('title'), 'txt': r.get('txt') } }) return ret def cli(self, query): results = self.search_by_query(query) for c, items in sorted(results.items()): print("%s:" % c) for fname, data in sorted(items.items()): print(" %s" % data.get('fpath')) print(" %s" % data.get('url')) print("") def html(self, query): tmplvars = { 'results': self.search_by_query(query), 'term': query } return j2.get_template(self.tmplfile).render(tmplvars) class WebmentionQueue(object): def __init__(self): self.db = sqlite3.connect( "%s" % config.get('var', 'webmentiondb') ) cursor = self.db.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS `queue` ( `id` INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE, `timestamp` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `source` TEXT NOT NULL, `target` TEXT NOT NULL, `status` INTEGER NOT NULL DEFAULT 0, `mtime` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); ''') self.db.commit() def __exit__(self): self.finish() def finish(self): self.db.close() def queue(self, source, target): cursor = self.db.cursor() cursor.execute( '''INSERT INTO queue (source,target) VALUES (?,?);''', ( source, target ) ) r = cursor.lastrowid self.db.commit() return r def get_queued(self, fname=None): logging.debug('getting queued webmentions for %s', fname) ret = [] cursor = self.db.cursor() cursor.execute('''SELECT * FROM queue WHERE target LIKE ? AND status = 0''', ('%'+fname+'%',)) rows = cursor.fetchall() for r in rows: ret.append({ 'id': r[0], 'dt': r[1], 'source': r[2], 'target': r[3], }) return ret def entry_done(self, id): logging.debug('setting %s webmention to done', id) cursor = self.db.cursor() cursor.execute("UPDATE queue SET status = 1 where ID=?", (id,)) self.db.commit() def __expandconfig(): c = configparser.ConfigParser( interpolation=configparser.ExtendedInterpolation(), allow_no_value=True ) c.read('config.ini') for s in c.sections(): for o in c.options(s): curr = c.get(s, o) if 'photo' == s and 'regex' == o: REGEX.update({'photo': re.compile(curr)}) c.set(s, o, os.path.expanduser(curr)) return c def baseN(num, b=36, numerals="0123456789abcdefghijklmnopqrstuvwxyz"): """ Used to create short, lowercase slug for a number (an epoch) passed """ num = int(num) return ((num == 0) and numerals[0]) or ( baseN( num // b, b, numerals ).lstrip(numerals[0]) + numerals[num % b] ) def slugfname(url): return "%s" % slugify( re.sub(r"^https?://(?:www)?", "", url), only_ascii=True, lower=True )[:200] def __setup_sitevars(): SiteVars = {} section = 'site' for o in config.options(section): SiteVars.update({o: config.get(section, o)}) # add site author section = 'author' SiteVars.update({section: {}}) for o in config.options(section): SiteVars[section].update({o: config.get(section, o)}) # add extra sections to author for sub in config.get('author', 'appendwith').split(): SiteVars[section].update({sub: {}}) for o in config.options(sub): SiteVars[section][sub].update({o: config.get(sub, o)}) # push the whole thing into cache return SiteVars def notify(msg): # telegram notification, if set if not config.has_section('api_telegram'): return url = "https://api.telegram.org/bot%s/sendMessage" % ( config.get('api_telegram', 'api_token') ) data = { 'chat_id': config.get('api_telegram', 'chat_id'), 'text': msg } # fire and forget try: requests.post(url, data=data) except: pass ARROWFORMAT = { 'iso': 'YYYY-MM-DDTHH:mm:ssZ', 'display': 'YYYY-MM-DD HH:mm', 'rcf': 'ddd, DD MMM YYYY HH:mm:ss Z' } LLEVEL = { 'critical': 50, 'error': 40, 'warning': 30, 'info': 20, 'debug': 10 } REGEX = { 'exifdate': re.compile( r'^(?P[0-9]{4}):(?P[0-9]{2}):(?P[0-9]{2})\s+' r'(?P