nasg/shared.py

656 lines
18 KiB
Python

import configparser
import os
import re
import glob
import logging
import subprocess
import json
import sqlite3
import requests
from slugify import slugify
import jinja2
class CMDLine(object):
def __init__(self, executable):
self.executable = self._which(executable)
if self.executable is None:
raise OSError('No %s found in PATH!' % executable)
return
@staticmethod
def _which(name):
for d in os.environ['PATH'].split(':'):
which = glob.glob(os.path.join(d, name), recursive=True)
if which:
return which.pop()
return None
class XRay(CMDLine):
cmd_prefix = 'chdir("/usr/local/lib/php/xray"); include("vendor/autoload.php"); $xray = new p3k\XRay();'
def __init__(self, url):
super().__init__('php')
self.url = url
self.target = ''
self.cmd = (
self.executable,
'-r',
'%s; echo(json_encode($xray->parse("%s")));' % (
self.cmd_prefix,
self.url
)
)
def set_receive(self, target):
self.cmd = (
self.executable,
'-r',
'%s; echo(json_encode($xray->parse("%s")));' % (
self.cmd_prefix,
self.url,
target
)
)
return self
def set_discover(self):
self.cmd = (
self.executable,
'-r',
'%s; echo(json_encode($xray->rels("%s")));' % (
self.cmd_prefix,
self.url,
)
)
return self
def parse(self):
logging.debug('pulling %s with XRay', self.url)
p = subprocess.Popen(
self.cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p.communicate()
if stderr:
logging.error("Error with XRay: %s", stderr)
return json.loads(stdout.decode('utf-8').strip())
class Pandoc(CMDLine):
""" Pandoc command line call with piped in- and output """
def __init__(self, md2html=True):
super().__init__('pandoc')
if True == md2html:
self.i = "markdown+" + "+".join([
'backtick_code_blocks',
'auto_identifiers',
'fenced_code_attributes',
'definition_lists',
'grid_tables',
'pipe_tables',
'strikeout',
'superscript',
'subscript',
'markdown_in_html_blocks',
'shortcut_reference_links',
'autolink_bare_uris',
'raw_html',
'link_attributes',
'header_attributes',
'footnotes',
])
self.o = 'html5'
elif 'plain' == md2html:
self.i = "markdown+" + "+".join([
'backtick_code_blocks',
'auto_identifiers',
'fenced_code_attributes',
'definition_lists',
'grid_tables',
'pipe_tables',
'strikeout',
'superscript',
'subscript',
'markdown_in_html_blocks',
'shortcut_reference_links',
'autolink_bare_uris',
'raw_html',
'link_attributes',
'header_attributes',
'footnotes',
])
self.o = "plain"
else:
self.o = "markdown-" + "-".join([
'raw_html',
'native_divs',
'native_spans',
])
self.i = 'html'
def convert(self, text):
cmd = (
self.executable,
'-o-',
'--from=%s' % self.i,
'--to=%s' % self.o
)
logging.debug('converting string with Pandoc')
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p.communicate(input=text.encode())
if stderr:
logging.error(
"Error during pandoc covert:\n\t%s\n\t%s",
cmd,
stderr
)
return stdout.decode('utf-8').strip()
class ExifTool(CMDLine):
def __init__(self, fpath):
self.fpath = fpath
super().__init__('exiftool')
@staticmethod
def exifdate2iso(value):
""" converts and EXIF date string to ISO 8601 format
:param value: EXIF date (2016:05:01 00:08:24)
:type arg1: str
:return: ISO 8601 string with UTC timezone 2016-05-01T00:08:24+0000
:rtype: str
"""
if not isinstance(value, str):
return value
match = REGEX['exifdate'].match(value)
if not match:
return value
return "%s-%s-%sT%s+0000" % (
match.group('year'),
match.group('month'),
match.group('day'),
match.group('time')
)
def read(self):
cmd = (
self.executable,
'-sort',
'-json',
'-MIMEType',
'-FileType',
'-FileName',
'-ModifyDate',
'-CreateDate',
'-DateTimeOriginal',
'-ImageHeight',
'-ImageWidth',
'-Aperture',
'-FOV',
'-ISO',
'-FocalLength',
'-FNumber',
'-FocalLengthIn35mmFormat',
'-ExposureTime',
'-Copyright',
'-Artist',
'-Model',
'-GPSLongitude#',
'-GPSLatitude#',
'-LensID',
'-LensSpec',
'-Lens',
'-ReleaseDate',
'-Description',
'-Headline',
'-HierarchicalSubject',
self.fpath
)
logging.debug('reading EXIF from %s', self.fpath)
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p.communicate()
if stderr:
logging.error("Error reading EXIF:\n\t%s\n\t%s", cmd, stderr)
exif = json.loads(stdout.decode('utf-8').strip()).pop()
if 'ReleaseDate' in exif and 'ReleaseTime' in exif:
exif['DateTimeRelease'] = "%s %s" % (
exif.get('ReleaseDate'), exif.get('ReleaseTime')[:8])
del(exif['ReleaseDate'])
del(exif['ReleaseTime'])
for k, v in exif.items():
exif[k] = self.exifdate2iso(v)
return exif
class BaseDB(object):
def __init__(self, fpath):
self.db = sqlite3.connect(fpath)
self.db.execute('PRAGMA auto_vacuum = INCREMENTAL;')
self.db.execute('PRAGMA journal_mode = MEMORY;')
self.db.execute('PRAGMA temp_store = MEMORY;')
self.db.execute('PRAGMA locking_mode = NORMAL;')
self.db.execute('PRAGMA synchronous = FULL;')
self.db.execute('PRAGMA encoding = "UTF-8";')
def __exit__(self):
self.finish()
def finish(self):
cursor = self.db.cursor()
cursor.execute('PRAGMA auto_vacuum;')
self.db.close()
# class TokenDBng(BaseDB):
# def __init__(self):
#self.fpath = config.get('var', 'tokendb')
# super().__init__(self.fpath)
#cursor = self.db.cursor()
# cursor.execute('''
# CREATE TABLE IF NOT EXISTS `tokens` (
#`service` TEXT PRIMARY KEY NOT NULL UNIQUE,
#`timestamp` TIMESTAMP NOT NULL DEFAULT (strftime('%s', 'now'))
#`oauth_token` TEXT NOT NULL,
#`oauth_token_secret` TEXT NOT NULL,
#`access_token` TEXT NOT NULL,
#`access_token_secret` TEXT NOT NULL,
#`verifier` TEXT NOT NULL
#);
#''')
# self.db.commit()
# TODO class SearchDBng(object):
# TODO class EXIFDBng(object):
class TokenDB(object):
def __init__(self, uuid='tokens'):
self.db = config.get('var', 'tokendb')
self.tokens = {}
self.refresh()
def refresh(self):
self.tokens = {}
if os.path.isfile(self.db):
with open(self.db, 'rt') as f:
self.tokens = json.loads(f.read())
def save(self):
with open(self.db, 'wt') as f:
f.write(json.dumps(
self.tokens, indent=4, sort_keys=True
))
def get_token(self, token):
return self.tokens.get(token, None)
def get_service(self, service):
token = self.tokens.get(service, None)
return token
def set_service(self, service, tokenid):
self.tokens.update({
service: tokenid
})
self.save()
def update_token(self,
token,
oauth_token_secret=None,
access_token=None,
access_token_secret=None,
verifier=None):
t = self.tokens.get(token, {})
if oauth_token_secret:
t.update({
'oauth_token_secret': oauth_token_secret
})
if access_token:
t.update({
'access_token': access_token
})
if access_token_secret:
t.update({
'access_token_secret': access_token_secret
})
if verifier:
t.update({
'verifier': verifier
})
self.tokens.update({
token: t
})
self.save()
def clear(self):
self.tokens = {}
self.save()
def clear_service(self, service):
t = self.tokens.get(service)
if t:
del(self.tokens[t])
del(self.tokens[service])
self.save()
class SearchDB(BaseDB):
tmplfile = 'Search.html'
def __init__(self):
self.fpath = "%s" % config.get('var', 'searchdb')
super().__init__(self.fpath)
cursor = self.db.cursor()
cursor.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS data USING FTS5(
id,
corpus,
mtime,
url,
category,
title,
tokenize = 'porter'
)''')
self.db.commit()
def __exit__(self):
self.finish()
def finish(self):
cursor = self.db.cursor()
cursor.execute('''PRAGMA auto_vacuum;''')
self.db.close()
def append(self, id, corpus, mtime, url, category, title):
mtime = int(mtime)
logging.debug("adding %s to searchdb", id)
cursor = self.db.cursor()
cursor.execute('''DELETE FROM data WHERE id=?''', (id,))
cursor.execute('''INSERT OR IGNORE INTO data (id, corpus, mtime, url, category, title) VALUES (?,?,?,?,?,?);''', (
id,
corpus,
mtime,
url,
category,
title
))
self.db.commit()
def is_uptodate(self, fname, mtime):
mtime = int(mtime)
ret = {}
cursor = self.db.cursor()
cursor.execute('''SELECT mtime
FROM data
WHERE id = ? AND mtime = ?''',
(fname, mtime)
)
rows = cursor.fetchall()
if len(rows):
logging.debug("%s is up to date in searchdb", fname)
return True
logging.debug("%s is out of date in searchdb", fname)
return False
def search_by_query(self, query):
ret = {}
cursor = self.db.cursor()
cursor.execute('''SELECT
id, category, url, title, snippet(data, 1, '', '', '[...]', 24)
FROM data
WHERE data MATCH ?
ORDER BY category, rank;''', (query,))
rows = cursor.fetchall()
for r in rows:
r = {
'id': r[0],
'category': r[1],
'url': r[2],
'title': r[3],
'txt': r[4],
}
category = r.get('category')
if category not in ret:
ret.update({category: {}})
maybe_fpath = os.path.join(
config.get('dirs', 'content'),
category,
"%s.*" % r.get('id')
)
#fpath = glob.glob(maybe_fpath).pop()
ret.get(category).update({
r.get('id'): {
#'fpath': fpath,
'url': r.get('url'),
'title': r.get('title'),
'txt': r.get('txt')
}
})
return ret
def cli(self, query):
results = self.search_by_query(query)
for c, items in sorted(results.items()):
print("%s:" % c)
for fname, data in sorted(items.items()):
print(" %s" % data.get('fpath'))
print(" %s" % data.get('url'))
print("")
def html(self, query):
tmplvars = {
'results': self.search_by_query(query),
'term': query
}
return j2.get_template(self.tmplfile).render(tmplvars)
class WebmentionQueue(BaseDB):
def __init__(self):
self.fpath = "%s" % config.get('var', 'webmentiondb')
super().__init__(self.fpath)
cursor = self.db.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS `queue` (
`id` INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE,
`timestamp` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`source` TEXT NOT NULL,
`target` TEXT NOT NULL,
`status` INTEGER NOT NULL DEFAULT 0,
`mtime` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
''')
self.db.commit()
def __exit__(self):
self.finish()
def finish(self):
self.db.close()
def queue(self, source, target):
cursor = self.db.cursor()
cursor.execute(
'''INSERT INTO queue (source,target) VALUES (?,?);''', (
source,
target
)
)
r = cursor.lastrowid
self.db.commit()
return r
def get_queued(self, fname=None):
logging.debug('getting queued webmentions for %s', fname)
ret = []
cursor = self.db.cursor()
cursor.execute(
'''SELECT * FROM queue WHERE target LIKE ? AND status = 0''',
('%' +
fname +
'%',
))
rows = cursor.fetchall()
for r in rows:
ret.append({
'id': r[0],
'dt': r[1],
'source': r[2],
'target': r[3],
})
return ret
def entry_done(self, id):
logging.debug('setting %s webmention to done', id)
cursor = self.db.cursor()
cursor.execute("UPDATE queue SET status = 1 where ID=?", (id,))
self.db.commit()
def __expandconfig():
c = configparser.ConfigParser(
interpolation=configparser.ExtendedInterpolation(),
allow_no_value=True
)
c.read('config.ini')
for s in c.sections():
for o in c.options(s):
curr = c.get(s, o)
if 'photo' == s and 'regex' == o:
REGEX.update({'photo': re.compile(curr)})
c.set(s, o, os.path.expanduser(curr))
return c
def baseN(num, b=36, numerals="0123456789abcdefghijklmnopqrstuvwxyz"):
""" Used to create short, lowercase slug for a number (an epoch) passed """
num = int(num)
return ((num == 0) and numerals[0]) or (
baseN(
num // b,
b,
numerals
).lstrip(numerals[0]) + numerals[num % b]
)
def slugfname(url):
return "%s" % slugify(
re.sub(r"^https?://(?:www)?", "", url),
only_ascii=True,
lower=True
)[:200]
def __setup_sitevars():
SiteVars = {}
section = 'site'
for o in config.options(section):
SiteVars.update({o: config.get(section, o)})
# TODO this should be a nice recursive function instead
# extra site section
for section in config.get('site', 'appendwith').split():
SiteVars.update({section: {}})
for o in config.options(section):
SiteVars[section].update({o: config.get(section, o)})
if not config.get(section, 'appendwith', fallback=False):
continue
# subsections
for sub in config.get(section, 'appendwith').split():
SiteVars[section].update({sub: {}})
for o in config.options(sub):
SiteVars[section][sub].update({o: config.get(sub, o)})
return SiteVars
def notify(msg):
# telegram notification, if set
if not config.has_section('api_telegram'):
return
url = "https://api.telegram.org/bot%s/sendMessage" % (
config.get('api_telegram', 'api_token')
)
data = {
'chat_id': config.get('api_telegram', 'chat_id'),
'text': msg
}
# fire and forget
try:
requests.post(url, data=data)
except BaseException:
pass
ARROWFORMAT = {
'iso': 'YYYY-MM-DDTHH:mm:ssZ',
'display': 'YYYY-MM-DD HH:mm',
'rcf': 'ddd, DD MMM YYYY HH:mm:ss Z',
'twitter': 'ddd MMM DD HH:mm:ss Z YYYY'
}
LLEVEL = {
'critical': 50,
'error': 40,
'warning': 30,
'info': 20,
'debug': 10
}
REGEX = {
'exifdate': re.compile(
r'^(?P<year>[0-9]{4}):(?P<month>[0-9]{2}):(?P<day>[0-9]{2})\s+'
r'(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$'
),
'cleanurl': re.compile(r"^https?://(?:www)?"),
'urls': re.compile(
r'\s+https?\:\/\/?[a-zA-Z0-9\.\/\?\:@\-_=#]+'
r'\.[a-zA-Z0-9\.\/\?\:@\-_=#]*'
),
'mdimg': re.compile(
r'(?P<shortcode>\!\[(?P<alt>[^\]]+)\]\((?P<fname>[^\s]+)'
r'(?:\s[\'\"](?P<title>[^\"\']+)[\'\"])?\)(?:\{(?P<css>[^\}]+)\})?)',
re.IGNORECASE
)
}
config = __expandconfig()
j2 = jinja2.Environment(
loader=jinja2.FileSystemLoader(
searchpath=config.get('dirs', 'tmpl')
),
lstrip_blocks=True
)
site = __setup_sitevars()