nasg/shared.py
Peter Molnar 70bd917de4 updated
2017-06-28 11:20:26 +00:00

270 lines
7 KiB
Python

import configparser
import os
import re
import glob
import logging
import subprocess
import json
from whoosh import fields
from whoosh import analysis
from slugify import slugify
def __expandconfig(config):
""" add the dirs to the config automatically """
basepath = os.path.expanduser(config.get('common','base'))
config.set('common', 'basedir', basepath)
for section in ['source', 'target']:
for option in config.options(section):
opt = config.get(section, option)
config.set(section, "%sdir" % option, os.path.join(basepath,opt))
config.set('target', 'filesdir', os.path.join(
config.get('target', 'builddir'),
config.get('source', 'files'),
))
config.set('target', 'commentsdir', os.path.join(
config.get('target', 'builddir'),
config.get('site', 'commentspath'),
))
return config
def baseN(num, b=36, numerals="0123456789abcdefghijklmnopqrstuvwxyz"):
""" Used to create short, lowercase slug for a number (an epoch) passed """
num = int(num)
return ((num == 0) and numerals[0]) or (
baseN(
num // b,
b,
numerals
).lstrip(numerals[0]) + numerals[num % b]
)
def slugfname(url):
return "%s" % slugify(re.sub(r"^https?://(?:www)?", "", url))[:200]
ARROWISO = 'YYYY-MM-DDTHH:mm:ssZ'
STRFISO = '%Y-%m-%dT%H:%M:%S%z'
URLREGEX = re.compile(
r'\s+https?\:\/\/?[a-zA-Z0-9\.\/\?\:@\-_=#]+'
r'\.[a-zA-Z0-9\.\/\?\:@\-_=#]*'
)
EXIFREXEG = re.compile(
r'^(?P<year>[0-9]{4}):(?P<month>[0-9]{2}):(?P<day>[0-9]{2})\s+'
r'(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$'
)
MDIMGREGEX = re.compile(
r'(!\[(.*)\]\((?:\/(?:files|cache)'
r'(?:\/[0-9]{4}\/[0-9]{2})?\/(.*\.(?:jpe?g|png|gif)))'
r'(?:\s+[\'\"]?(.*?)[\'\"]?)?\)(?:\{(.*?)\})?)'
, re.IGNORECASE)
schema = fields.Schema(
url=fields.ID(
stored=True,
unique=True
),
title=fields.TEXT(
stored=True,
analyzer=analysis.FancyAnalyzer()
),
date=fields.DATETIME(
stored=True,
sortable=True
),
content=fields.TEXT(
stored=True,
analyzer=analysis.FancyAnalyzer()
),
fuzzy=fields.NGRAMWORDS(
tokenizer=analysis.NgramTokenizer(4)
),
tags=fields.TEXT(
stored=True,
analyzer=analysis.KeywordAnalyzer(
lowercase=True,
commas=True
),
),
weight=fields.NUMERIC(
sortable=True
),
img=fields.TEXT(
stored=True
),
mtime=fields.NUMERIC(
stored=True
)
)
config = configparser.ConfigParser(
interpolation=configparser.ExtendedInterpolation(),
allow_no_value=True
)
config.read('config.ini')
config = __expandconfig(config)
class TokenDB(object):
def __init__(self):
self.db = os.path.abspath(os.path.join(
config.get('common', 'basedir'),
'tokens.json'
))
self.tokens = {}
self.refresh()
def refresh(self):
if os.path.isfile(self.db):
with open(self.db, 'rt') as f:
self.tokens = json.loads(f.read())
def save(self):
with open(self.db, 'wt') as f:
f.write(
json.dumps(
self.tokens, indent=4, sort_keys=True
)
)
self.refresh()
def get_token(self, token):
return self.tokens.get(token, None)
def get_service(self, service):
s = self.tokens.get(service, None)
if s:
s = self.get_token(s)
return s
def set_service(self, service, token):
self.tokens.update({
service: token
})
#self.save()
def set_token(self, token, secret):
self.tokens.update({
token: {
'oauth_token': token,
'oauth_token_secret': secret
}
})
#self.save()
def set_verifier(self, token, verifier):
t = self.tokens.get(token)
t.update({
'verifier': verifier
})
self.tokens.update({
token: t
})
#self.save()
tokendb = TokenDB()
class CMDLine(object):
def __init__(self, executable):
self.executable = self._which(executable)
if self.executable is None:
raise OSError('No %s found in PATH!' % executable)
return
@staticmethod
def _which(name):
for d in os.environ['PATH'].split(':'):
which = glob.glob(os.path.join(d, name), recursive=True)
if which:
return which.pop()
return None
def __enter__(self):
self.process = subprocess.Popen(
[self.executable, "-stay_open", "True", "-@", "-"],
universal_newlines=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
return self
def __exit__(self, exc_type, exc_value, traceback):
self.process.stdin.write("-stay_open\nFalse\n")
self.process.stdin.flush()
def execute(self, *args):
args = args + ("-execute\n",)
self.process.stdin.write(str.join("\n", args))
self.process.stdin.flush()
output = ""
fd = self.process.stdout.fileno()
while not output.endswith(self.sentinel):
output += os.read(fd, 4096).decode('utf-8', errors='ignore')
return output[:-len(self.sentinel)]
class Pandoc(CMDLine):
""" Pandoc command line call with piped in- and output """
def __init__(self, md2html=True):
super().__init__('pandoc')
if md2html:
self.i = "markdown+" + "+".join([
'backtick_code_blocks',
'auto_identifiers',
'fenced_code_attributes',
'definition_lists',
'grid_tables',
'pipe_tables',
'strikeout',
'superscript',
'subscript',
'markdown_in_html_blocks',
'shortcut_reference_links',
'autolink_bare_uris',
'raw_html',
'link_attributes',
'header_attributes',
'footnotes',
])
self.o = 'html5'
else:
self.o = "markdown-" + "-".join([
'raw_html',
'native_divs',
'native_spans',
])
self.i = 'html'
def convert(self, text):
cmd = (
self.executable,
'-o-',
'--from=%s' % self.i,
'--to=%s' % self.o
)
logging.debug('converting string with Pandoc')
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p.communicate(input=text.encode())
if stderr:
logging.error(
"Error during pandoc covert:\n\t%s\n\t%s",
cmd,
stderr
)
return stdout.decode('utf-8').strip()