cleanup, step 1: removed files needs clearing up and added archive.py to replace pesos
Peter Molnar hello@petermolnar.eu
Tue, 24 Oct 2017 14:56:45 +0100
11 files changed,
811 insertions(+),
2172 deletions(-)
A
archive.py
@@ -0,0 +1,811 @@
+import os +import json +import requests +import glob +import logging +import shutil +import subprocess +import arrow + +from requests_oauthlib import OAuth1Session, oauth1_session, OAuth2Session, oauth2_session +from oauthlib.oauth2 import BackendApplicationClient + +import shared + + +class Favs(object): + def __init__(self, confgroup): + self.confgroup = confgroup + + @property + def lastpulled(self): + mtime = 0 + d = os.path.join( + shared.config.get('archive', 'favorite'), + "%s-*" % self.confgroup + ) + files = glob.glob(d) + for f in files: + ftime = int(os.path.getmtime(f)) + if ftime > mtime: + mtime = ftime + + mtime = mtime + 1 + logging.debug("last flickr fav timestamp: %s", mtime) + return mtime + + +class FlickrFavs(Favs): + url = 'https://api.flickr.com/services/rest/' + + def __init__(self): + super(FlickrFavs, self).__init__('flickr') + self.get_uid() + self.params = { + 'method': 'flickr.favorites.getList', + 'api_key': shared.config.get('api_flickr', 'api_key'), + 'user_id': self.uid, + 'extras': 'description,geo,tags,owner_name,date_upload,url_o,url_k,url_h,url_b,url_c,url_z', + 'per_page': 500, # maximim + 'format': 'json', + 'nojsoncallback': '1', + 'min_fave_date': self.lastpulled + } + + def get_uid(self): + params = { + 'method': 'flickr.people.findByUsername', + 'api_key': shared.config.get('api_flickr', 'api_key'), + 'format': 'json', + 'nojsoncallback': '1', + 'username': shared.config.get('api_flickr', 'username'), + } + r = requests.get( + self.url, + params=params + ) + parsed = json.loads(r.text) + self.uid = parsed.get('user', {}).get('id') + + + def getpaged(self, offset): + logging.info('requesting page #%d of paginated results', offset) + self.params.update({ + 'page': offset + }) + r = requests.get( + self.url, + params=self.params + ) + parsed = json.loads(r.text) + return parsed.get('photos', {}).get('photo', []) + + def run(self): + r = requests.get(self.url,params=self.params) + js = json.loads(r.text) + js = js.get('photos', {}) + + photos = js.get('photo', []) + + total = int(js.get('pages', 1)) + current = int(js.get('page', 1)) + cntr = total - current + + while cntr > 0: + current = current + 1 + paged = self.getpaged(current) + photos = photos + paged + cntr = total - current + + for photo in photos: + fav = FlickrFav(photo) + if not fav.exists: + fav.run() + +class FivehpxFavs(Favs): + def __init__(self): + super(FivehpxFavs, self).__init__('500px') + self.params = { + 'consumer_key': shared.config.get('api_500px', 'api_key'), + 'rpp': 100, # maximum + 'image_size': 4, + 'include_tags': 1, + 'include_geo': 1, + 'sort': 'created_at', + 'sort_direction': 'desc' + } + self.oauth = FivehpxOauth() + self.uid = None + self.galid = None + + def get_uid(self): + r = self.oauth.request( + 'https://api.500px.com/v1/users', + params={} + ) + js = json.loads(r.text) + self.uid = js.get('user', {}).get('id') + + def get_favgalid(self): + r = self.oauth.request( + 'https://api.500px.com/v1/users/%s/galleries' % (self.uid), + params={ + 'kinds': 5 # see https://github.com/500px/api-documentation/blob/master/basics/formats_and_terms.md#gallery-kinds + } + ) + js = json.loads(r.text) + g = js.get('galleries', []).pop() + self.galid = g.get('id') + + + @property + def url(self): + return 'https://api.500px.com/v1/users/%s/galleries/%s/items' % ( + self.uid, + self.galid + ) + + def getpaged(self, offset): + logging.info('requesting page #%d of paginated results', offset) + self.params.update({ + 'page': offset + }) + r = requests.get( + self.url, + params=self.params + ) + parsed = json.loads(r.text) + return parsed.get('photos') + + def run(self): + self.get_uid() + self.get_favgalid() + + r = requests.get(self.url,params=self.params) + js = json.loads(r.text) + photos = js.get('photos') + + total = int(js.get('total_pages', 1)) + current = int(js.get('current_page', 1)) + cntr = total - current + + while cntr > 0: + current = current + 1 + paged = self.getpaged(current) + photos = photos + paged + cntr = total - current + + for photo in photos: + fav = FivehpxFav(photo) + if not fav.exists: + fav.run() + + +class TumblrFavs(Favs): + url = 'https://api.tumblr.com/v2/user/likes' + + def __init__(self): + super(TumblrFavs, self).__init__('tumblr') + self.oauth = TumblrOauth() + self.params = { + 'after': self.lastpulled + } + self.likes = [] + + def getpaged(self, offset): + r = self.oauth.request( + self.url, + params={'offset': offset} + ) + return json.loads(r.text) + + def run(self): + r = self.oauth.request( + self.url, + params=self.params + ) + + js = json.loads(r.text) + total = int(js.get('response', {}).get('liked_count', 20)) + offset = 20 + cntr = total - offset + likes = js.get('response', {}).get('liked_posts', []) + while cntr > 0: + paged = self.getpaged(offset) + likes = likes + paged.get('response', {}).get('liked_posts', []) + offset = offset + 20 + cntr = total - offset + + self.likes = likes + for like in self.likes: + fav = TumblrFav(like) + if not fav.exists: + fav.run() + + +class DAFavs(Favs): + def __init__(self): + from pprint import pprint + super(DAFavs, self).__init__('deviantart') + self.username = shared.config.get('api_deviantart', 'username'), + self.oauth = DAOauth() + self.likes = [] + self.galid = None + self.params = { + 'limit': 24, # this is the max as far as I can tell + 'mature_content': 'true', + 'username': self.username + } + + def get_favgalid(self): + r = self.oauth.request( + 'https://www.deviantart.com/api/v1/oauth2/collections/folders', + params={ + 'username': self.username, + 'calculate_size': 'false', + 'ext_preload': 'false', + 'mature_content': 'true' + } + ) + js = json.loads(r.text) + for g in js.get('results', []): + if 'Featured' == g.get('name'): + self.galid = g.get('folderid') + break + + @property + def url(self): + return 'https://www.deviantart.com/api/v1/oauth2/collections/%s' % (self.galid) + + + def getpaged(self, offset): + self.params.update({'offset': offset}) + r = self.oauth.request( + self.url, + self.params + ) + js = json.loads(r.text) + return js + + def getsinglemeta(self, daid): + r = self.oauth.request( + 'https://www.deviantart.com/api/v1/oauth2/deviation/metadata', + params={ + 'deviationids[]': daid, + 'ext_submission': False, + 'ext_camera': False, + 'ext_stats': False, + 'ext_collection': False, + 'mature_content': True, + } + ) + meta = {} + try: + meta = json.loads(r.text) + return meta.get('metadata', []).pop() + except: + return meta + + def has_more(self, q): + if True == q or 'True' == q or 'true' == q: + return True + return False + + def run(self): + self.get_favgalid() + + r = self.oauth.request( + self.url, + self.params + ) + + js = json.loads(r.text) + favs = js.get('results', []) + has_more = self.has_more(js.get('has_more')) + offset = js.get('next_offset') + while True == has_more: + logging.info('iterating over DA results with offset %d', offset) + paged = self.getpaged(offset) + new = paged.get('results', []) + if not len(new): + #logging.error('empty results from deviantART, breaking loop') + break + favs = favs + new + has_more = self.has_more(paged.get('has_more')) + if not has_more: + break + n = int(paged.get('next_offset')) + if not n: + break + offset = offset + n + + self.favs = favs + for fav in self.favs: + f = DAFav(fav) + if f.exists: + continue + + f.fav.update({'meta': self.getsinglemeta(fav.get('deviationid'))}) + f.run() + +class ImgFav(object): + def __init__(self): + self.imgurl = '' + self.meta = { + 'dt': arrow.utcnow(), + 'title': '', + 'favorite-of': '', + 'tags': [], + 'geo': { + 'latitude': '', + 'longitude': '', + }, + 'author': { + 'name': '', + 'url': '', + }, + } + self.content = '' + + @property + def exists(self): + return os.path.exists(self.target) + + def pull_image(self): + logging.info("pulling image %s to %s", self.imgurl, self.target) + r = requests.get(self.imgurl, stream=True) + if r.status_code == 200: + with open(self.target, 'wb') as f: + r.raw.decode_content = True + shutil.copyfileobj(r.raw, f) + + + def write_exif(self): + logging.info('populating EXIF data of %s' % self.target) + tags = list(set(self.meta.get('tags',[]))) + dt = self.meta.get('dt').to('utc') + + geo_lat = False + geo_lon = False + if self.meta.get('geo', None): + geo = self.meta.get('geo', None) + lat = geo.get('latitude', None) + lon = geo.get('longitude', None) + if lat and lon and 'null' != lat and 'null' != lon: + geo_lat = lat + geo_lon = lon + + author_name = '' + author_url = '' + if self.meta.get('author', None): + a = self.meta.get('author') + author_name = a.get('name', '') + author_url = a.get('url', '') + author_name = "%s" % author_name + author_url = "%s" % author_url + + params = [ + 'exiftool', + '-overwrite_original', + '-EXIF:Artist=%s' % author_name[:64], + '-XMP:Copyright=Copyright %s %s (%s)' % ( + dt.format('YYYY'), + author_name, + author_url, + ), + '-XMP:Source=%s' % self.meta.get('favorite-of'), + '-XMP:ReleaseDate=%s' % dt.format('YYYY:MM:DD HH:mm:ss'), + '-XMP:Headline=%s' % self.meta.get('title'), + '-XMP:Description=%s' % self.content, + ]; + for t in tags: + params.append('-XMP:HierarchicalSubject+=%s' % t) + params.append('-XMP:Subject+=%s' % t) + if geo_lat and geo_lon: + geo_lat = round(float(geo_lat),6) + geo_lon = round(float(geo_lon),6) + + if geo_lat < 0: + GPSLatitudeRef = 'S' + else: + GPSLatitudeRef = 'N' + + if geo_lon < 0: + GPSLongitudeRef = 'W' + else: + GPSLongitudeRef = 'E' + + params.append('-GPSLongitude=%s' % abs(geo_lon)) + params.append('-GPSLatitude=%s' % abs(geo_lat)) + params.append('-GPSLongitudeRef=%s' % GPSLongitudeRef) + params.append('-GPSLatitudeRef=%s' % GPSLatitudeRef) + params.append(self.target); + + p = subprocess.Popen( + params, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = p.communicate() + _original = '%s_original' % self.target + if os.path.exists(_original): + os.unlink(_original) + + +class FlickrFav(ImgFav): + url = 'https://api.flickr.com/services/rest/' + + def __init__(self, photo): + self.photo = photo + self.ownerid = photo.get('owner') + self.photoid = photo.get('id') + self.url = "https://www.flickr.com/photos/%s/%s" % (self.ownerid, self.photoid) + self.target = os.path.join( + shared.config.get('archive', 'favorite'), + "flickr-%s-%s.jpg" % (self.ownerid, self.photoid) + ) + + def run(self): + + if self.exists: + logging.warning("%s already exists, skipping", self.target) + return + + # the bigger the better, see + # https://www.flickr.com/services/api/misc.urls.html + img = self.photo.get( + 'url_o', + self.photo.get('url_k', + self.photo.get('url_h', + self.photo.get('url_b', + self.photo.get('url_c', + self.photo.get('url_z', + False + ) + ) + ) + ) + ) + ) + + if not img: + logging.error("image url was empty for %s, skipping fav", self.url) + return + self.imgurl = img + self.pull_image() + self.meta = { + 'dt': arrow.get( + self.photo.get('date_faved', + arrow.utcnow().timestamp + ) + ), + 'title': '%s' % shared.Pandoc('plain').convert( + self.photo.get('title', '') + ).rstrip(), + 'favorite-of': self.url, + 'tags': self.photo.get('tags', '').split(' '), + 'geo': { + 'latitude': self.photo.get('latitude', ''), + 'longitude': self.photo.get('longitude', ''), + }, + 'author': { + 'name': self.photo.get('ownername'), + 'url': 'https://www.flickr.com/people/%s' % ( + self.photo.get('owner') + ), + }, + } + + self.content = shared.Pandoc('plain').convert( + self.photo.get('description', {}).get('_content', '') + ) + + self.write_exif() + +class FivehpxFav(ImgFav): + def __init__(self, photo): + self.photo = photo + self.ownerid = photo.get('user_id') + self.photoid = photo.get('id') + self.target = os.path.join( + shared.config.get('archive', 'favorite'), + "500px-%s-%s.jpg" % (self.ownerid, self.photoid) + ) + self.url = "https://www.500px.com%s" % (photo.get('url')) + + def run(self): + img = self.photo.get('images')[0].get('url') + if not img: + logging.error("image url was empty for %s, skipping fav", self.url) + return + self.imgurl = img + self.pull_image() + + self.meta = { + 'dt': arrow.get( + self.photo.get('created_at', + arrow.utcnow().timestamp + ) + ), + 'title': '%s' % shared.Pandoc('plain').convert( + self.photo.get('name', '') + ).rstrip(), + 'favorite-of': self.url, + 'tags': self.photo.get('tags', []), + 'geo': { + 'latitude': self.photo.get('latitude', ''), + 'longitude': self.photo.get('longitude', ''), + }, + 'author': { + 'name': self.photo.get('user').get('fullname', self.ownerid), + 'url': 'https://www.500px.com/%s' % ( + self.photo.get('user').get('username', self.ownerid) + ), + }, + } + c = "%s" % self.photo.get('description', '') + self.content = shared.Pandoc('plain').convert(c) + self.write_exif() + +class DAFav(ImgFav): + def __init__(self, fav): + self.fav = fav + self.deviationid = fav.get('deviationid') + self.url = fav.get('url') + self.title = fav.get('title', False) or self.deviationid + self.author = self.fav.get('author').get('username') + self.target = os.path.join( + shared.config.get('archive', 'favorite'), + "deviantart-%s-%s.jpg" % ( + shared.slugfname(self.title), + shared.slugfname(self.author) + ) + ) + self.imgurl = fav.get('content', {}).get('src') + + def run(self): + self.pull_image() + + self.meta = { + 'dt': arrow.get( + self.fav.get('published_time', + arrow.utcnow().timestamp + ) + ), + 'title': '%s' % shared.Pandoc('plain').convert(self.title).rstrip(), + 'favorite-of': self.url, + 'tags': [t.get('tag_name') for t in self.fav.get('meta', {}).get('tags', [])], + 'author': { + 'name': self.author, + 'url': 'https://%s.deviantart.com' % (self.author), + }, + } + c = "%s" % self.fav.get('meta', {}).get('description', '') + self.content = shared.Pandoc('plain').convert(c) + self.write_exif() + + +class TumblrFav(object): + def __init__(self, like): + self.like = like + self.blogname = like.get('blog_name') + self.postid = like.get('id') + self.target = os.path.join( + shared.config.get('archive', 'favorite'), + "tumblr-%s-%s.jpg" % (self.blogname, self.postid) + ) + self.url = like.get('post_url') + self.images = [] + + @property + def exists(self): + return os.path.exists(self.target.replace('.jpg', '_0.jpg')) + + def run(self): + content = "%s" % self.like.get('caption', '') + title = self.like.get('summary', '').strip() + if not len(title): + title = self.like.get('slug', '').strip() + if not len(title): + title = shared.slugfname(self.like.get('post_url')) + + meta = { + 'dt': arrow.get( + self.like.get('liked_timestamp', + self.like.get('date', + arrow.utcnow().timestamp + ) + ) + ), + 'title': title, + 'favorite-of': self.url, + 'tags': self.like.get('tags'), + 'author': { + 'name': self.like.get('blog_name'), + 'url': 'http://%s.tumblr.com' % self.like.get('blog_name') + }, + } + + icntr = 0 + for p in self.like.get('photos', []): + img = ImgFav() + img.target = self.target.replace('.jpg', '_%d.jpg' % icntr) + img.imgurl = p.get('original_size').get('url') + img.content = content + img.meta = meta + img.pull_image() + img.write_exif() + icntr = icntr + 1 + + +class Oauth2Flow(object): + token_url = '' + + def __init__(self, service): + self.service = service + self.key = shared.config.get("api_%s" % service, 'api_key') + self.secret = shared.config.get("api_%s" % service, 'api_secret') + client = BackendApplicationClient( + client_id=self.key + ) + client.prepare_request_body(scope=['browse']) + oauth = OAuth2Session(client=client) + token = oauth.fetch_token( + token_url=self.token_url, + client_id=self.key, + client_secret=self.secret + ) + self.client = OAuth2Session( + self.key, + token=token + ) + + def request(self, url, params={}): + return self.client.get(url, params=params) + + +class DAOauth(Oauth2Flow): + token_url = 'https://www.deviantart.com/oauth2/token' + + def __init__(self): + super(DAOauth, self).__init__('deviantart') + + +class Oauth1Flow(object): + request_token_url = '' + access_token_url = '' + authorize_url = '' + + def __init__(self, service): + self.service = service + self.key = shared.config.get("api_%s" % service, 'api_key') + self.secret = shared.config.get("api_%s" % service, 'api_secret') + self.tokendb = shared.TokenDB() + self.t = self.tokendb.get_service(self.service) + self.oauth_init() + + def oauth_init(self): + if not self.t: + self.request_oauth_token() + + t = self.tokendb.get_token(self.t) + if not t.get('access_token', None) or not t.get('access_token_secret', None): + self.request_access_token() + + def request_oauth_token(self): + client = OAuth1Session( + self.key, + client_secret=self.secret, + callback_uri="%s/oauth1/" % shared.config.get('site', 'url') + ) + r = client.fetch_request_token(self.request_token_url) + logging.debug('setting token to %s', r.get('oauth_token')) + self.t = r.get('oauth_token') + logging.debug('updating secret to %s', r.get('oauth_token_secret')) + self.tokendb.update_token( + self.t, + oauth_token_secret=r.get('oauth_token_secret') + ) + self.tokendb.set_service( + self.service, + self.t + ) + + existing = self.tokendb.get_token(self.t) + verified = existing.get('verifier', None) + while not verified: + logging.debug('verifier missing for %s', self.t) + self.auth_url(existing) + self.tokendb.refresh() + existing = self.tokendb.get_token(self.t) + verified = existing.get('verifier', None) + + def auth_url(self, existing): + t = self.tokendb.get_token(self.t) + client = OAuth1Session( + self.key, + client_secret=self.secret, + resource_owner_key=self.t, + resource_owner_secret=t.get('oauth_token_secret'), + callback_uri="%s/oauth1/" % shared.config.get('site', 'url') + ) + input('Visit: %s and press any key after' % ( + client.authorization_url(self.authorize_url) + )) + + def request_access_token(self): + try: + t = self.tokendb.get_token(self.t) + client = OAuth1Session( + self.key, + client_secret=self.secret, + callback_uri="%s/oauth1/" % shared.config.get('site', 'url'), + resource_owner_key=self.t, + resource_owner_secret=t.get('oauth_token_secret'), + verifier=t.get('verifier') + ) + r = client.fetch_access_token(self.access_token_url) + self.tokendb.update_token( + self.t, + access_token=r.get('oauth_token'), + access_token_secret=r.get('oauth_token_secret') + ) + except oauth1_session.TokenRequestDenied as e: + logging.error('getting access token was denied, clearing former oauth tokens and re-running everyting') + self.tokendb.clear_service(self.service) + self.oauth_init() + + + def request(self, url, params): + t = self.tokendb.get_token(self.t) + client = OAuth1Session( + self.key, + client_secret=self.secret, + resource_owner_key=t.get('access_token'), + resource_owner_secret=t.get('access_token_secret') + ) + return client.get(url, params=params) + + +class FivehpxOauth(Oauth1Flow): + request_token_url = 'https://api.500px.com/v1/oauth/request_token' + access_token_url = 'https://api.500px.com/v1/oauth/access_token' + authorize_url = 'https://api.500px.com/v1/oauth/authorize' + + def __init__(self): + super(FivehpxOauth, self).__init__('500px') + + +class FlickrOauth(Oauth1Flow): + request_token_url = 'https://www.flickr.com/services/oauth/request_token' + access_token_url = 'https://www.flickr.com/services/oauth/access_token' + authorize_url = 'https://www.flickr.com/services/oauth/authorize' + + def __init__(self): + super(FlickrOauth, self).__init__('flickr') + + +class TumblrOauth(Oauth1Flow): + request_token_url = 'https://www.tumblr.com/oauth/request_token' + access_token_url = 'https://www.tumblr.com/oauth/access_token' + authorize_url = 'https://www.tumblr.com/oauth/authorize' + + def __init__(self): + super(TumblrOauth, self).__init__('tumblr') + + +if __name__ == '__main__': + logging.basicConfig(level=10) + + flickr = FlickrFavs() + flickr.run() + + fivehpx = FivehpxFavs() + fivehpx.run() + + tumblr = TumblrFavs() + tumblr.run() + + da = DAFavs() + da.run()
D
envelope.py
@@ -1,193 +0,0 @@
-from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText -from email.mime.image import MIMEImage -from email.header import Header -import email.charset -from email.generator import Generator -from io import StringIO -import mimetypes -from email.mime.base import MIMEBase -from email.encoders import encode_base64 -import email.utils - -import time -import getpass -import socket -import shutil -import requests -import tempfile -import atexit -import os -import re -import smtplib -import logging -from shared import Pandoc - -class Letter(object): - def __init__(self, sender=None, recipient=None, subject='', text=''): - self.sender = sender or (getpass.getuser(), socket.gethostname()) - self.recipient = recipient or self.sender - - self.tmp = tempfile.mkdtemp( - 'envelope_', - dir=tempfile.gettempdir() - ) - atexit.register( - shutil.rmtree, - os.path.abspath(self.tmp) - ) - self.text = text; - self.subject = subject - self.images = [] - self.ready = None - self.time = time.time() - self.headers = {} - - @property - def _html(self): - return Pandoc().convert(self.text) - - @property - def _tmpl(self): - return "<html><head></head><body>%s</body></html>" % (self._html) - - def __pull_image(self, img): - fname = os.path.basename(img) - i = { - 'url': img, - 'name': fname, - 'tmp': os.path.join(self.tmp, fname), - } - - logging.debug("pulling image %s", i['url']) - r = requests.get(i['url'], stream=True) - if r.status_code == 200: - with open(i['tmp'], 'wb') as f: - logging.debug("writing image %s", i['tmp']) - r.raw.decode_content = True - shutil.copyfileobj(r.raw, f) - if not isinstance(self.images, list): - self.images = [] - self.images.append(i) - - - def __pull_images(self): - mdmatch = re.compile( - r'!\[.*\]\((.*?\.(?:jpe?g|png|gif)(?:\s+[\'\"]?.*?[\'\"]?)?)\)' - r'(?:\{.*?\})?' - ) - [self.__pull_image(img) for img in mdmatch.findall(self.text)] - - - def __attach_images(self): - self.__pull_images() - for i in self.images: - cid = 'cid:%s' % (i['name']) - logging.debug("replacing %s with %s", i['url'], cid) - self.text = self.text.replace(i['url'], cid) - - - def make(self, inline_images=True): - if inline_images: - self.__attach_images() - - - # Python, by default, encodes utf-8 in base64, which makes plain text - # mail painful; this overrides and forces Quoted Printable. - # Quoted Printable is still awful, but better, and we're going to - # force the mail to be 8bit encoded. - # Note: enforcing 8bit breaks compatibility with ancient mail clients. - email.charset.add_charset('utf-8', email.charset.QP, email.charset.QP, 'utf-8') - - mail = MIMEMultipart('alternative') - - # --- setting headers --- - self.headers = { - 'Subject': Header(re.sub(r"\r?\n?$", "", self.subject, 1), 'utf-8').encode(), - 'To': email.utils.formataddr(self.recipient), - 'From': email.utils.formataddr(self.sender), - 'Date': email.utils.formatdate(self.time, localtime=True) - } - - for k, v in self.headers.items(): - mail.add_header(k, "%s" % v) - logging.debug("headers: %s", self.headers) - - # --- adding plain text --- - text = self.text - _text = MIMEText(text, 'text', _charset='utf-8') - # --- - # this is the part where we overwrite the way Python thinks: - # force the text to be the actual, unencoded, utf-8. - # Note:these steps breaks compatibility with ancient mail clients. - _text.replace_header('Content-Transfer-Encoding', '8bit') - _text.replace_header('Content-Type', 'text/plain; charset=utf-8') - _text.set_payload(self.text) - # --- - logging.debug("text: %s", _text) - mail.attach(_text) - - # --- HTML bit --- - # this is where it gets tricky: the HTML part should be a 'related' - # wrapper, in which the text and all the related images are sitting - _envelope = MIMEMultipart('related') - - - html = self._tmpl - _html = MIMEText(html, 'html', _charset='utf-8') - # --- - # see above under 'adding plain text' - _html.replace_header('Content-Transfer-Encoding', '8bit') - _html.replace_header('Content-Type', 'text/html; charset=utf-8') - _html.set_payload(html) - # --- - logging.debug("HTML: %s", _html) - _envelope.attach(_html) - - for i in self.images: - mimetype, encoding = mimetypes.guess_type(i['tmp']) - mimetype = mimetype or 'application/octet-stream' - mimetype = mimetype.split('/', 1) - attachment = MIMEBase(mimetype[0], mimetype[1]) - with open(i['tmp'], 'rb') as img: - attachment.set_payload(img.read()) - img.close() - os.unlink(i['tmp']) - - encode_base64(attachment) - attachment.add_header( - 'Content-Disposition', - 'inline', - filename=i['name'] - ) - attachment.add_header( - 'Content-ID', - '<%s>' % (i['name']) - ) - - _envelope.attach(attachment) - - # add the whole html + image pack to the mail - mail.attach(_envelope) - - str_io = StringIO() - g = Generator(str_io, False) - g.flatten(mail) - - self.ready = str_io.getvalue().encode('utf-8') - - def send(self): - if not self.ready: - logging.error('this mail is not ready') - return - - try: - s = smtplib.SMTP('127.0.0.1', 25) - # unless you do the encode, you'll get: - # File "/usr/local/lib/python3.5/smtplib.py", line 850, in sendmail - # msg = _fix_eols(msg).encode('ascii') - # UnicodeEncodeError: 'ascii' codec can't encode character '\xa0' in position 1073: ordinal not in range(128) - s.sendmail(self.headers['From'], self.headers['To'], self.ready) - s.quit() - except Exception as e: - logging.error('sending mail failed with error: %s', e)
D
micropub.py
@@ -1,286 +0,0 @@
-#!/usr/bin/env python3 - -import os -import asyncio -import uvloop -from sanic import Sanic -import sanic.response -from sanic.log import log as logging - -import os -import arrow -import frontmatter -import glob -import tempfile -from slugify import slugify -import glob -import shared -from nasg import BaseRenderable, Renderer, Singular -import requests -import urllib.parse - -class NewEntry(BaseRenderable): - metamap = { - 'summary': 'summary', - 'name': 'title', - 'in-reply-to': 'in-reply-to', - 'repost-of': 'repost-of', - 'bookmark-of': 'bookmark-of', - 'like-of': 'favorite-of', - } - - categorymap = { - 'in-reply-to': 'note', - 'repost-of': 'note', - 'bookmark-of': 'bookmark', - 'favorite-of': 'favorite' - } - - slugmap = [ - 'slug', - 'in-reply-to', - 'repost-of', - 'bookmark-of', - 'like-of', - 'title' - ] - - # needs self.mtime, self.target - - def __init__(self, request): - self.dt = arrow.utcnow() - self.fm = frontmatter.loads('') - self.request = request - self.response = sanic.response.text("Unhandled error", status=500) - logging.debug(request.form) - - def __try_adding_meta(self, lookfor, kname): - t = self.request.form.get(lookfor, None) - if t and len(t): - self.fm.metadata[kname] = self.request.form.get(lookfor) - - @property - def path(self): - return os.path.abspath(os.path.join( - shared.config.get('source', 'contentdir'), - self.category, - "%s.md" % self.fname - )) - - @property - def target(self): - targetdir = os.path.abspath(os.path.join( - shared.config.get('target', 'builddir'), - self.fname - )) - return os.path.join(targetdir, 'index.html') - - @property - def category(self): - category = 'note' - for meta, cname in self.categorymap.items(): - if meta in self.fm.metadata: - logging.debug('changing category to %s because we have %s', cname, meta) - category = cname - - if 'summary' in self.fm.metadata: - if 'IT' in self.fm.metada['tags'] or 'it' in self.fm.metada['tags']: - category = 'article' - logging.debug('changing category to %s', category) - if 'journal' in self.fm.metada['tags'] or 'journal' in self.fm.metada['tags']: - category = 'journal' - logging.debug('changing category to %s', category) - - - return category - - - @property - def existing_tags(self): - if hasattr(self, '_existing_tags'): - return self._existing_tags - - existing = glob.glob(os.path.join( - shared.config.get('target', 'builddir'), - "tag", - "*" - )); - - self._existing_tags = existing - return self._existing_tags - - - @property - def existing_slugs(self): - if hasattr(self, '_existing_slugs'): - return self._existing_slugs - - existing = [os.path.splitext(i)[0] for i in list(map( - os.path.basename, glob.glob( - os.path.join( - shared.config.get('source', 'contentdir'), - "*", - "*.md" - ) - ) - ))] - - self._existing_slugs = existing - return self._existing_slugs - - - @property - def fname(self): - if hasattr(self, '_slug'): - return self._slug - - slug = shared.baseN(self.dt.timestamp) - for maybe in self.slugmap: - val = self.request.form.get(maybe, None) - if not val: - continue - logging.debug('using %s for slug', maybe) - slug = shared.slugfname(val) - break - - self._slug = slug - return self._slug - - - @property - def exists(self): - if self.fname in self.existing_slugs: - logging.warning("slug already exists: %s", slug) - return True - return False - #inc = 1 - #while slug in slugs: - #slug = "%s-%d" % (slug, inc) - #inc = inc+1 - #logging.warning("Using %s as slug instead", slug) - - def run(self): - if not self.verify(): - return - - self.parse() - - if self.exists: - self.response = sanic.response.text( - "update is not yet supported", - status=401 - ) - return - - self.write() - #self.render() - - def verify(self): - if 'q' in self.request.args: - if 'config' in self.request.args['q']: - self.response = sanic.response.json({ - 'tags': self.existing_tags - }, status=200) - return - if 'syndicate-to' in self.request.args['q']: - self.response = sanic.response.json({ - 'syndicate-to': [] - }, status=200) - return - - if not 'access_token' in self.request.form: - self.response = sanic.response.text( - "Mising access token", - status=401 - ) - return - - token = self.request.form.get('access_token') - - verify = requests.get( - 'https://tokens.indieauth.com/token', - allow_redirects=False, - timeout=10, - headers={ - 'Content-Type': 'application/x-www-form-urlencoded', - 'Authorization': 'Bearer %s' % (token) - }); - - if verify.status_code != requests.codes.ok: - self.response = sanic.response.text( - "Could not verify access token", - status=500 - ) - return False - - response = urllib.parse.parse_qs(verify.text) - logging.debug(response) - if 'scope' not in response or 'me' not in response: - self.response = sanic.response.text( - "Could not verify access token 'me'", - status=401 - ) - return False - - if '%s/' % (shared.config.get('site','url').rstrip()) not in response['me']: - self.response = sanic.response.text( - "You can't post to this domain.", - status=401 - ) - return False - - if 'create' not in "%s" % response['scope']: - self.response = sanic.response.text( - "Invalid scope", - status=401 - ) - return False - return True - - def parse(self): - self.fm.metadata['published'] = self.dt.format(shared.ARROWISO) - - for lookfor, kname in self.metamap.items(): - self.__try_adding_meta(lookfor, kname) - - if self.request.form.get('content', None): - self.fm.content = self.request.form.get('content') - - if self.request.form.get('category[]', None): - self.fm.metadata['tags'] = list(self.request.form['category[]']) - - def write(self): - logging.info('writing incoming post to: %s', self.path) - with open (self.path, 'wt') as f: - f.write(frontmatter.dumps(self.fm)) - self.response = sanic.response.text( - "Created", - status=201 - ) - - #def render(self): - #singular = Singular(self.path) - #singular.render() - #self.response = sanic.response.text( - #"Post created", - #status = 201, - #headers = { - #'Location': "%s" % (singular.url) - #} - #) - #return - - - -if __name__ == '__main__': - asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - app = Sanic() - - @app.route("/micropub", methods=["POST","GET"]) - async def mpub(request): - r = NewEntry(request) - r.run() - return r.response - - - app.run(host="127.0.0.1", port=8004, debug=True)
D
new.py
@@ -1,130 +0,0 @@
-#!/usr/bin/env python3 - -import os -import sys -import arrow -import argparse -import frontmatter -import glob -import sys -import tempfile -from slugify import slugify -import shared - -if __name__ == '__main__': - # --- set params - slugs = [os.path.splitext(i)[0] for i in list(map( - os.path.basename, glob.glob( - os.path.join( - shared.config.get('source', 'contentdir'), - "*", - "*.md" - ) - ) - ))] - - categories = list(map( - os.path.basename, glob.glob( - os.path.join( - shared.config.get('source', 'contentdir'), - "*", - ) - ) - )) - now = arrow.utcnow() - parser = argparse.ArgumentParser(description='create doc and print it to stdout') - parser.add_argument('--tags', '-t', help='; separated, quoted list of tags') - parser.add_argument('--date', '-d', help=' YYYY-mm-ddTHH:MM:SS+TZ formatted date, if not now') - parser.add_argument('--slug', '-s', help='slug (normally autogenerated from title or pubdate)') - parser.add_argument('--title', '-l', help='title of new entry') - parser.add_argument('--bookmark', '-b', help='URL to bookmark') - parser.add_argument('--reply', '-r', help='URL to reply to') - parser.add_argument('--repost', '-p', help='URL to repost') - parser.add_argument('--content', '-c', help='content of entry') - parser.add_argument('--summary', '-u', help='summary of entry') - parser.add_argument('--redirect', '-i', help='; separated, quoted list of redirects') - args = vars(parser.parse_args()) - - if not args['date']: - d = now.format(shared.ARROWISO) - args['date'] = input('Date [%s]: ' % (d)) or d - - if not args['title']: - args['title'] = input('Title []: ') or '' - - if not args['tags']: - args['tags'] = input('Tags (separated by ;) []: ') or None - if args['tags']: - args['tags'] = args['tags'].split(';') - - if not args['bookmark']: - args['bookmark'] = input('Bookmark of URL []: ') or '' - - if not args['reply']: - args['reply'] = input('Reply to URL []: ') or '' - - if not args['repost']: - args['repost'] = input('Repost of URL []: ') or '' - - if not args['slug']: - if args['title']: - slug = slugify(args['title'], only_ascii=True, lower=True) - elif args['bookmark']: - slug = slugify("re: %s" % (args['bookmark']), only_ascii=True, lower=True) - elif args['reply']: - slug = slugify("re: %s" % (args['reply']), only_ascii=True, lower=True) - elif args['repost']: - slug = slugify("re: %s" % (args['repost']), only_ascii=True, lower=True) - else: - slug = shared.baseN(now.timestamp) - args['slug'] = input('Slug [%s]: ' % (slug)) or slug - - if args['slug'] in slugs: - print("This slug already exists: %s", args['slug']) - slugbase = args['slug'] - inc = 1 - while args['slug'] in slugs: - args['slug'] = "%s-%d" % (slugbase, inc) - inc = inc+1 - print("Using %s as slug", args['slug']) - - if not args['summary']: - args['summary'] = input('Summary []: ') or '' - - if not args['content']: - args['content'] = input('Content []: ') or '' - - if not args['redirect']: - args['redirect'] = input('Additional slugs (separated by ;) []: ') or None - if args['redirect']: - args['redirect'] = args['redirect'].split(';') - - doc = frontmatter.loads('') - slug = args['slug'] - del(args['slug']) - content = args['content'] - del(args['content']) - - repl = { - 'repost': 'repost-of', - 'bookmark': 'bookmark-of', - 'reply': 'in-reply-to', - 'date': 'published', - } - for orig, new in repl.items(): - args[new] = args[orig] - del(args[orig]) - - doc.metadata = dict((k, v) for k, v in args.items() if v) - doc.content = content - - tmpsave = os.path.join(tempfile.gettempdir(), "%s.md" % slug) - saveto = input('Save to: [%s]: ' % categories) or 'bookmark' - - if tmpsave != saveto: - saveto = os.path.join(shared.config.get('source', 'contentdir'), saveto, "%s.md" % slug) - - with open(saveto, 'wt') as f: - f.write(frontmatter.dumps(doc)) - - print("wrote file to:\n%s" % saveto)
D
oauth.py
@@ -1,295 +0,0 @@
-#!/usr/bin/env python3 - -import asyncio -import uvloop -import os -import json -from sanic import Sanic -import sanic.response -from sanic.log import log as logging -import shared -import requests -from requests_oauthlib import OAuth1Session, oauth1_session, OAuth2Session, oauth2_session -from oauthlib.oauth2 import BackendApplicationClient -import json -import tempfile - -from pprint import pprint - -class TokenDB(object): - def __init__(self, uuid='tokens'): - self.db = os.path.abspath(os.path.join( - tempfile.gettempdir(), - "%s.json" % uuid - )) - self.tokens = {} - self.refresh() - - def refresh(self): - self.tokens = {} - if os.path.isfile(self.db): - with open(self.db, 'rt') as f: - self.tokens = json.loads(f.read()) - - def save(self): - with open(self.db, 'wt') as f: - f.write(json.dumps( - self.tokens, indent=4, sort_keys=True - )) - self.refresh() - - def get_token(self, token): - return self.tokens.get(token, None) - - def get_service(self, service): - token = self.tokens.get(service, None) - #if token: - #token = self.get_token(token) - return token - - def set_service(self, service, tokenid): - self.tokens.update({ - service: tokenid - }) - self.save() - - def update_token(self, - token, - oauth_token_secret=None, - access_token=None, - access_token_secret=None, - verifier=None): - - t = self.tokens.get(token, {}) - if oauth_token_secret: - t.update({ - 'oauth_token_secret': oauth_token_secret - }) - if access_token: - t.update({ - 'access_token': access_token - }) - if access_token_secret: - t.update({ - 'access_token_secret': access_token_secret - }) - if verifier: - t.update({ - 'verifier': verifier - }) - - self.tokens.update({ - token: t - }) - self.save() - - def clear(self): - self.tokens = {} - self.save() - - def clear_service(self, service): - t = self.tokens.get(service) - if t: - del(self.tokens[t]) - del(self.tokens[service]) - self.save() - - -class Oauth2Flow(object): - token_url = '' - - def __init__(self, service): - self.service = service - self.key = shared.config.get(service, 'api_key') - self.secret = shared.config.get(service, 'api_secret') - client = BackendApplicationClient( - client_id=self.key - ) - client.prepare_request_body(scope=['browse']) - oauth = OAuth2Session(client=client) - token = oauth.fetch_token( - token_url=self.token_url, - client_id=self.key, - client_secret=self.secret - ) - self.client = OAuth2Session( - self.key, - token=token - ) - - def request(self, url, params={}): - return self.client.get(url, params=params) - - - -class DAOauth(Oauth2Flow): - token_url = 'https://www.deviantart.com/oauth2/token' - - def __init__(self): - super(DAOauth, self).__init__('deviantart') - - -class Oauth1Flow(object): - request_token_url = '' - access_token_url = '' - authorize_url = '' - - def __init__(self, service): - self.service = service - self.key = shared.config.get(service, 'api_key') - self.secret = shared.config.get(service, 'api_secret') - self.tokendb = TokenDB() - self.t = self.tokendb.get_service(self.service) - self.oauth_init() - - def oauth_init(self): - if not self.t: - self.request_oauth_token() - - t = self.tokendb.get_token(self.t) - if not t.get('access_token', None) or not t.get('access_token_secret', None): - self.request_access_token() - - def request_oauth_token(self): - client = OAuth1Session( - self.key, - client_secret=self.secret, - callback_uri="%s/oauth1/" % shared.config.get('site', 'url') - ) - r = client.fetch_request_token(self.request_token_url) - logging.debug('setting token to %s', r.get('oauth_token')) - self.t = r.get('oauth_token') - logging.debug('updating secret to %s', r.get('oauth_token_secret')) - self.tokendb.update_token( - self.t, - oauth_token_secret=r.get('oauth_token_secret') - ) - self.tokendb.set_service( - self.service, - self.t - ) - - existing = self.tokendb.get_token(self.t) - verified = existing.get('verifier', None) - while not verified: - logging.debug('verifier missing for %s', self.t) - self.auth_url(existing) - self.tokendb.refresh() - existing = self.tokendb.get_token(self.t) - verified = existing.get('verifier', None) - - def auth_url(self, existing): - t = self.tokendb.get_token(self.t) - client = OAuth1Session( - self.key, - client_secret=self.secret, - resource_owner_key=self.t, - resource_owner_secret=t.get('oauth_token_secret'), - callback_uri="%s/oauth1/" % shared.config.get('site', 'url') - ) - input('Visit: %s and press any key after' % ( - client.authorization_url(self.authorize_url) - )) - - def request_access_token(self): - try: - t = self.tokendb.get_token(self.t) - client = OAuth1Session( - self.key, - client_secret=self.secret, - callback_uri="%s/oauth1/" % shared.config.get('site', 'url'), - resource_owner_key=self.t, - resource_owner_secret=t.get('oauth_token_secret'), - verifier=t.get('verifier') - ) - r = client.fetch_access_token(self.access_token_url) - self.tokendb.update_token( - self.t, - access_token=r.get('oauth_token'), - access_token_secret=r.get('oauth_token_secret') - ) - except oauth1_session.TokenRequestDenied as e: - logging.error('getting access token was denied, clearing former oauth tokens and re-running everyting') - self.tokendb.clear_service(self.service) - self.oauth_init() - - - def request(self, url, params): - t = self.tokendb.get_token(self.t) - client = OAuth1Session( - self.key, - client_secret=self.secret, - resource_owner_key=t.get('access_token'), - resource_owner_secret=t.get('access_token_secret') - ) - return client.get(url, params=params) - - -class FivehpxOauth(Oauth1Flow): - request_token_url = 'https://api.500px.com/v1/oauth/request_token' - access_token_url = 'https://api.500px.com/v1/oauth/access_token' - authorize_url = 'https://api.500px.com/v1/oauth/authorize' - - def __init__(self): - super(FivehpxOauth, self).__init__('500px') - - -class FlickrOauth(Oauth1Flow): - request_token_url = 'https://www.flickr.com/services/oauth/request_token' - access_token_url = 'https://www.flickr.com/services/oauth/access_token' - authorize_url = 'https://www.flickr.com/services/oauth/authorize' - - def __init__(self): - super(FlickrOauth, self).__init__('flickr') - - -class TumblrOauth(Oauth1Flow): - request_token_url = 'https://www.tumblr.com/oauth/request_token' - access_token_url = 'https://www.tumblr.com/oauth/access_token' - authorize_url = 'https://www.tumblr.com/oauth/authorize' - - def __init__(self): - super(TumblrOauth, self).__init__('tumblr') - -#class WPOauth(Oauth1Flow): - #request_token_url = 'https://public-api.wordpress.com/oauth2/token' - #access_token_url = 'https://public-api.wordpress.com/oauth2/authenticate' - #authorize_url = 'https://public-api.wordpress.com/oauth2/authorize' - - #def __init__(self): - #super(WPOauth, self).__init__('wordpress.com') - - -if __name__ == '__main__': - asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - app = Sanic() - - @app.route("/oauth1", methods=["GET"]) - async def oa(request): - token = request.args.get('oauth_token') - verifier = request.args.get('oauth_verifier') - tokendb = TokenDB() - tokendb.update_token( - token, - verifier=verifier - ) - return sanic.response.text( - "OK", - status=200 - ) - - #@app.route("/oauth2", methods=["GET"]) - #async def oa2(request): - ##token = request.args.get('oauth_token') - ##verifier = request.args.get('oauth_verifier') - ##tokendb = TokenDB() - ##tokendb.update_token( - ##token, - ##verifier=verifier - ##) - #return sanic.response.text( - #json.dumps(request.args), - #status=200 - #) - - app.run(host="127.0.0.1", port=8006, debug=True)
D
offlinecopies.py
@@ -1,154 +0,0 @@
-import glob -import os -import logging -import json -import frontmatter -import requests -from urllib.parse import urlparse, urlunparse -import shared - - -# remove the rest of the potential loggers -while len(logging.root.handlers) > 0: - logging.root.removeHandler(logging.root.handlers[-1]) - -# --- set loglevel -logging.basicConfig( - level=10, - format='%(asctime)s - %(levelname)s - %(message)s' -) - - -def find_realurl(url): - headers = requests.utils.default_headers() - headers.update({ - 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', - }) - - try: - r = requests.get( - url, - allow_redirects=True, - timeout=60, - headers=headers - ) - except Exception as e: - logging.error('getting real url failed: %s', e) - return (None, 400) - - finalurl = list(urlparse(r.url)) - finalurl[4] = '&'.join( - [x for x in finalurl[4].split('&') if not x.startswith('utm_')]) - finalurl = urlunparse(finalurl) - - return (finalurl, r.status_code) - -def find_archiveorgurl(url): - url, status = find_realurl(url) - if status == requests.codes.ok: - return url - - try: - a = requests.get( - "http://archive.org/wayback/available?url=%s" % url, - ) - except Exception as e: - logging.error('Failed to fetch archive.org availability for %s' % url) - return None - - if not a: - logging.error('empty archive.org availability for %s' % url) - return None - - try: - a = json.loads(a.text) - aurl = a.get( - 'archived_snapshots', {} - ).get( - 'closest', {} - ).get( - 'url', None - ) - if aurl: - logging.debug("found %s in archive.org for %s", aurl, url) - return aurl - except Exception as e: - logging.error("archive.org parsing failed: %s", e) - - return None - - -class wget(shared.CMDLine): - def __init__(self, url, dirname=None): - super().__init__('wget') - self.url = url - self.slug = dirname or slugfname(self.url) - self.saveto = os.path.join( - config.get('source', 'offlinecopiesdir'), - self.slug - ) - - def archive(self): - cmd = ( - self.executable, - '-e', - 'robots=off', - '--timeout=360', - '--no-clobber', - '--no-directories', - '--adjust-extension', - '--span-hosts', - '--wait=1', - '--random-wait', - '--convert-links', - #'--backup-converted', - '--page-requisites', - '--directory-prefix=%s' % self.saveto, - "%s" % self.url - ) - logging.debug('getting URL %s with wget', self.url) - p = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - - stdout, stderr = p.communicate() - if stderr: - logging.error( - "Error getting URL:\n\t%s\n\t%s", - cmd, - stderr - ) - return stdout.decode('utf-8').strip() - - - -bookmarks = glob.glob(shared.config.get('dynamic', 'bookmarks'), '*.md') -bm = {} -for b in bookmarks: - with open(b, 'rt') as f: - fm = frontmatter.loads(f.read()) - if not fm.metadata.get('bookmark-of'): - continue - bm[b] = fm - -for fname, fm in bm.items(): - logging.info('dealing with %s', fname) - url = fm.metadata.get('bookmark-of') - f, ext = os.path.splitext(os.path.basename(fname)) - p = os.path.join( - shared.config.get('source', 'offlinecopiesdir'), - f - ) - if os.path.isdir(p): - continue - - trueurl = shared.find_archiveorgurl(url) - w = wget(trueurl, dirname=f) - w.archive() - - # this is to skip the failed ones next time - if not os.path.isdir(p): - os.mkdir(p)
D
pesos.py
@@ -1,694 +0,0 @@
-#!/usr/bin/env python3 - -import json -import os -import hashlib -import glob -import frontmatter -import requests -import shared -import logging -import re -import shutil -import arrow -import bs4 -from slugify import slugify -import oauth -import argparse - - -class Bookmark(object): - def __init__(self, title, url, fname=None): - self.fm = frontmatter.loads('') - fname = fname or slugify(title) - self.fname = "%s.md" % fname - self.target = os.path.join( - shared.config.get('source', 'contentdir'), - shared.config.get('source', 'bookmarks'), - self.fname - ) - self.fm.metadata = { - 'published': arrow.utcnow().format(shared.ARROWISO), - 'title': title, - 'bookmark-of': url, - } - - def write(self): - logging.info('saving bookmark to %s', self.target) - with open(self.target, 'wt') as t: - t.write(frontmatter.dumps(self.fm)) - -class HNBookmarks(object): - prefix = 'hn-' - def __init__(self): - self.url = 'https://news.ycombinator.com/favorites?id=%s' % ( - shared.config.get('hackernews', 'user_id') - ) - - @property - def existing(self): - if hasattr(self, '_existing'): - return self._existing - - d = os.path.join( - shared.config.get('source', 'contentdir'), - "*", - "%s*.md" % self.prefix - ) - files = reversed(sorted(glob.glob(d))) - self._existing = [ - os.path.basename(f.replace(self.prefix, '').replace('.md', '')) - for f in files - ] - - return self._existing - - def run(self): - r = requests.get(self.url) - soup = bs4.BeautifulSoup(r.text, "html5lib") - rows = soup.find_all('tr', attrs={'class':'athing' }) - for row in rows: - rid = row.get('id') - if rid in self.existing: - continue - - link = row.find('a', attrs={'class':'storylink' }) - url = link.get('href') - title = " ".join(link.contents) - fname = "%s%s" % (self.prefix, rid) - - bookmark = Bookmark(title, url, fname) - bookmark.write() - -class Fav(object): - def __init__(self): - self.arrow = arrow.utcnow() - self.fm = frontmatter.loads('') - - @property - def target(self): - return os.path.join( - shared.config.get('source', 'contentdir'), - shared.config.get('source', 'favs'), - self.fname - ) - - @property - def exists(self): - return os.path.isfile(self.target) - - @property - def imgname(self): - # the _ is to differentiate between my photos, where the md and jpg name is the same, and favs - return self.fname.replace('.md', '_.jpg') - - @property - def imgtarget(self): - return os.path.join( - shared.config.get('source', 'filesdir'), - self.imgname - ) - - def saveimg(self, url, target=None): - target = target or self.imgtarget - if os.path.isfile(target): - logging.error("%s already exists, refusing to overwrite", target) - return - - logging.info("pulling image %s to files", url) - r = requests.get(url, stream=True) - if r.status_code == 200: - with open(target, 'wb') as f: - r.raw.decode_content = True - shutil.copyfileobj(r.raw, f) - - def write(self): - logging.info('saving fav to %s', self.target) - with open(self.target, 'wt') as t: - t.write(frontmatter.dumps(self.fm)) - os.utime(self.target, (self.arrow.timestamp, self.arrow.timestamp)) - - -class PinterestFav(Fav): - def __init__(self, url): - super(PinterestFav, self).__init__() - self.url = url - self.fname = "pinterest-%s.md" % (list(filter(None, url.split('/')))[-1]) - - def run(self): - try: - r = requests.get(self.url) - soup = bs4.BeautifulSoup(r.text, 'lxml') - ld = json.loads(soup.find('script', type='application/ld+json').text) - imgurl = ld.get('image') - self.saveimg(imgurl) - - self.fm.metadata = { - 'published': arrow.get( - ld.get('datePublished', arrow.utcnow().timestamp) - ).format(shared.ARROWISO), - 'title': ld.get('headline', self.url), - 'favorite-of': self.url, - 'image': self.imgname - } - content = ld.get('articleBody', '') - content = shared.Pandoc(False).convert(content) - self.fm.content = content - - except Exception as e: - logging.error('saving pinterest fav %s failed: %s', self.url, e) - return - - -class FlickrFav(Fav): - url = 'https://api.flickr.com/services/rest/' - - def __init__(self, photo): - super(FlickrFav, self).__init__() - self.photo = photo - self.ownerid = photo.get('owner') - self.photoid = photo.get('id') - self.fname = "flickr-%s-%s.md" % (self.ownerid, self.photoid) - self.url = "https://www.flickr.com/photos/%s/%s" % (self.ownerid, self.photoid) - - def run(self): - img = self.photo.get('url_b', self.photo.get('url_z', False)) - if not img: - logging.error("image url was empty for %s, skipping fav", self.url) - return - - self.saveimg(img) - self.arrow = arrow.get( - self.photo.get('date_faved', arrow.utcnow().timestamp) - ) - self.fm.metadata = { - 'published': self.arrow.format(shared.ARROWISO), - 'title': '%s' % self.photo.get('title', self.fname), - 'favorite-of': self.url, - 'flickr_tags': self.photo.get('tags', '').split(' '), - 'geo': { - 'latitude': self.photo.get('latitude', ''), - 'longitude': self.photo.get('longitude', ''), - }, - 'author': { - 'name': self.photo.get('owner_name'), - 'url': 'https://www.flickr.com/people/%s' % ( - self.photo.get('owner') - ), - }, - 'image': self.imgname - } - - content = self.photo.get('description', {}).get('_content', '') - content = shared.Pandoc(False).convert(content) - self.fm.content = content - - -class FivehpxFav(Fav): - def __init__(self, photo): - super(FivehpxFav, self).__init__() - self.photo = photo - self.ownerid = photo.get('user_id') - self.photoid = photo.get('id') - self.fname = "500px-%s-%s.md" % (self.ownerid, self.photoid) - self.url = "https://www.500px.com%s" % (photo.get('url')) - - def run(self): - img = self.photo.get('images')[0].get('url') - if not img: - logging.error("image url was empty for %s, skipping fav", self.url) - return - - self.saveimg(img) - self.arrow = arrow.get( - self.photo.get('created_at', arrow.utcnow().timestamp) - ) - self.fm.metadata = { - 'published': self.arrow.format(shared.ARROWISO), - 'title': '%s' % self.photo.get('name', self.fname), - 'favorite-of': self.url, - 'fivehpx_tags': self.photo.get('tags', []), - 'geo': { - 'latitude': self.photo.get('latitude', ''), - 'longitude': self.photo.get('longitude', ''), - }, - 'author': { - 'name': self.photo.get('user').get('fullname', self.ownerid), - 'url': 'https://www.500px.com/%s' % ( - self.photo.get('user').get('username', self.ownerid) - ), - }, - 'image': self.imgname - } - - content = self.photo.get('description', '') - if content: - content = shared.Pandoc(False).convert(content) - else: - content = '' - self.fm.content = content - - -class TumblrFav(Fav): - def __init__(self, like): - super(TumblrFav, self).__init__() - self.like = like - self.blogname = like.get('blog_name') - self.postid = like.get('id') - self.fname = "tumblr-%s-%s.md" % (self.blogname, self.postid) - self.url = like.get('post_url') - self.images = [] - - def run(self): - icntr = 0 - for p in self.like.get('photos', []): - i = p.get('original_size').get('url') - logging.debug('parsing image %s', i) - n = self.fname.replace('.md', '_%d.jpg' % icntr) - self.images.append(n) - nt = os.path.join( - shared.config.get('source', 'filesdir'), - n - ) - self.saveimg(i, nt) - icntr = icntr + 1 - - self.arrow = arrow.get( - self.like.get('liked_timestamp', - self.like.get('date', - arrow.utcnow().timestamp - ) - ) - ) - - self.fm.content = self.like.get('caption', '') - - title = self.like.get('summary', '').strip() - if not len(title): - title = self.like.get('slug', '').strip() - if not len(title): - title = shared.slugfname(self.like.get('post_url')) - - self.fm.metadata = { - 'published': self.arrow.format(shared.ARROWISO), - 'title': title, - 'favorite-of': self.url, - 'tumblr_tags': self.like.get('tags'), - 'author': { - 'name': self.like.get('blog_name'), - 'url': 'http://%s.tumblr.com' % self.like.get('blog_name') - }, - 'images': self.images - } - - -class DAFav(Fav): - def __init__(self, fav): - super(DAFav, self).__init__() - self.fav = fav - self.deviationid = fav.get('deviationid') - self.url = fav.get('url') - self.title = fav.get('title', False) or self.deviationid - self.author = self.fav.get('author').get('username') - self.fname = "deviantart-%s-by-%s.md" % ( - slugify(self.title), slugify(self.author) - ) - self.image = fav.get('content', {}).get('src') - - def run(self): - self.saveimg(self.image) - - self.arrow = arrow.get( - self.fav.get('published_time', arrow.utcnow().timestamp) - ) - - self.fm.metadata = { - 'published': self.arrow.format(shared.ARROWISO), - 'title': '%s' % self.title, - 'favorite-of': self.url, - 'da_tags': [t.get('tag_name') for t in self.fav.get('meta', {}).get('tags', [])], - 'author': { - 'name': self.author, - 'url': 'https://%s.deviantart.com' % (self.author), - }, - 'image': self.imgname - } - - content = self.fav.get('meta', {}).get('description', '') - content = shared.Pandoc(False).convert(content) - self.fm.content = content - - -class Favs(object): - def __init__(self, confgroup): - self.confgroup = confgroup - - @property - def lastpulled(self): - mtime = 0 - d = os.path.join( - shared.config.get('source', 'contentdir'), - shared.config.get('source', 'favs'), - "%s-*.md" % self.confgroup - ) - files = glob.glob(d) - for f in files: - ftime = int(os.path.getmtime(f)) - if ftime > mtime: - mtime = ftime - - mtime = mtime + 1 - logging.debug("last flickr fav timestamp: %s", mtime) - return mtime - - -class FlickrFavs(Favs): - url = 'https://api.flickr.com/services/rest/' - - def __init__(self): - super(FlickrFavs, self).__init__('flickr') - self.get_uid() - self.params = { - 'method': 'flickr.favorites.getList', - 'api_key': shared.config.get('flickr', 'api_key'), - 'user_id': self.uid, - 'extras': 'description,geo,tags,url_z,url_b,owner_name,date_upload', - 'per_page': 500, # maximim - 'format': 'json', - 'nojsoncallback': '1', - 'min_fave_date': self.lastpulled - } - - def get_uid(self): - params = { - 'method': 'flickr.people.findByUsername', - 'api_key': shared.config.get('flickr', 'api_key'), - 'format': 'json', - 'nojsoncallback': '1', - 'username': shared.config.get('flickr', 'username'), - } - r = requests.get( - self.url, - params=params - ) - parsed = json.loads(r.text) - self.uid = parsed.get('user', {}).get('id') - - - def getpaged(self, offset): - logging.info('requesting page #%d of paginated results', offset) - self.params.update({ - 'page': offset - }) - r = requests.get( - self.url, - params=self.params - ) - parsed = json.loads(r.text) - return parsed.get('photos', {}).get('photo', []) - - def run(self): - r = requests.get(self.url,params=self.params) - js = json.loads(r.text) - js = js.get('photos', {}) - - photos = js.get('photo', []) - - total = int(js.get('pages', 1)) - current = int(js.get('page', 1)) - cntr = total - current - - while cntr > 0: - current = current + 1 - paged = self.getpaged(current) - photos = photos + paged - cntr = total - current - - for photo in photos: - fav = FlickrFav(photo) - if not fav.exists: - fav.run() - fav.write() - -class FivehpxFavs(Favs): - def __init__(self): - super(FivehpxFavs, self).__init__('500px') - self.params = { - 'consumer_key': shared.config.get('500px', 'api_key'), - 'rpp': 100, # maximum - 'image_size': 4, - 'include_tags': 1, - 'include_geo': 1, - 'sort': 'created_at', - 'sort_direction': 'desc' - } - self.oauth = oauth.FivehpxOauth() - self.uid = None - self.galid = None - - def get_uid(self): - r = self.oauth.request( - 'https://api.500px.com/v1/users', - params={} - ) - js = json.loads(r.text) - self.uid = js.get('user', {}).get('id') - - def get_favgalid(self): - r = self.oauth.request( - 'https://api.500px.com/v1/users/%s/galleries' % (self.uid), - params={ - 'kinds': 5 # see https://github.com/500px/api-documentation/blob/master/basics/formats_and_terms.md#gallery-kinds - } - ) - js = json.loads(r.text) - g = js.get('galleries', []).pop() - self.galid = g.get('id') - - - @property - def url(self): - return 'https://api.500px.com/v1/users/%s/galleries/%s/items' % ( - self.uid, - self.galid - ) - - def getpaged(self, offset): - logging.info('requesting page #%d of paginated results', offset) - self.params.update({ - 'page': offset - }) - r = requests.get( - self.url, - params=self.params - ) - parsed = json.loads(r.text) - return parsed.get('photos') - - def run(self): - self.get_uid() - self.get_favgalid() - - r = requests.get(self.url,params=self.params) - js = json.loads(r.text) - photos = js.get('photos') - - total = int(js.get('total_pages', 1)) - current = int(js.get('current_page', 1)) - cntr = total - current - - while cntr > 0: - current = current + 1 - paged = self.getpaged(current) - photos = photos + paged - cntr = total - current - - for photo in photos: - fav = FivehpxFav(photo) - if not fav.exists: - fav.run() - fav.write() - - -class TumblrFavs(Favs): - url = 'https://api.tumblr.com/v2/user/likes' - - def __init__(self): - super(TumblrFavs, self).__init__('tumblr') - self.oauth = oauth.TumblrOauth() - self.params = { - 'after': self.lastpulled - } - self.likes = [] - - def getpaged(self, offset): - r = self.oauth.request( - self.url, - params={'offset': offset} - ) - return json.loads(r.text) - - def run(self): - r = self.oauth.request( - self.url, - params=self.params - ) - - js = json.loads(r.text) - total = int(js.get('response', {}).get('liked_count', 20)) - offset = 20 - cntr = total - offset - likes = js.get('response', {}).get('liked_posts', []) - while cntr > 0: - paged = self.getpaged(offset) - likes = likes + paged.get('response', {}).get('liked_posts', []) - offset = offset + 20 - cntr = total - offset - - self.likes = likes - for like in self.likes: - fav = TumblrFav(like) - if not fav.exists: - fav.run() - fav.write() - - -class DAFavs(Favs): - def __init__(self): - from pprint import pprint - super(DAFavs, self).__init__('deviantart') - self.username = shared.config.get(self.confgroup, 'username'), - self.oauth = oauth.DAOauth() - self.likes = [] - self.galid = None - self.params = { - 'limit': 24, # this is the max as far as I can tell - 'mature_content': 'true', - 'username': self.username - } - - def get_favgalid(self): - r = self.oauth.request( - 'https://www.deviantart.com/api/v1/oauth2/collections/folders', - params={ - 'username': self.username, - 'calculate_size': 'false', - 'ext_preload': 'false', - 'mature_content': 'true' - } - ) - js = json.loads(r.text) - for g in js.get('results', []): - if 'Featured' == g.get('name'): - self.galid = g.get('folderid') - break - - @property - def url(self): - return 'https://www.deviantart.com/api/v1/oauth2/collections/%s' % (self.galid) - - - def getpaged(self, offset): - self.params.update({'offset': offset}) - r = self.oauth.request( - self.url, - self.params - ) - js = json.loads(r.text) - return js - - def getsinglemeta(self, daid): - r = self.oauth.request( - 'https://www.deviantart.com/api/v1/oauth2/deviation/metadata', - params={ - 'deviationids[]': daid, - 'ext_submission': False, - 'ext_camera': False, - 'ext_stats': False, - 'ext_collection': False, - 'mature_content': True, - } - ) - meta = {} - try: - meta = json.loads(r.text) - return meta.get('metadata', []).pop() - except: - return meta - - def has_more(self, q): - if True == q or 'True' == q or 'true' == q: - return True - return False - - def run(self): - self.get_favgalid() - - r = self.oauth.request( - self.url, - self.params - ) - - js = json.loads(r.text) - favs = js.get('results', []) - has_more = self.has_more(js.get('has_more')) - offset = js.get('next_offset') - while True == has_more: - logging.info('iterating over DA results with offset %d', offset) - paged = self.getpaged(offset) - new = paged.get('results', []) - if not len(new): - #logging.error('empty results from deviantART, breaking loop') - break - favs = favs + new - has_more = self.has_more(paged.get('has_more')) - if not has_more: - break - n = int(paged.get('next_offset')) - if not n: - break - offset = offset + n - - self.favs = favs - for fav in self.favs: - f = DAFav(fav) - if f.exists: - continue - - f.fav.update({'meta': self.getsinglemeta(fav.get('deviationid'))}) - f.run() - f.write() - -if __name__ == '__main__': - - parser = argparse.ArgumentParser(description='Parameters for NASG') - parser.add_argument( - '--loglevel', - default='error', - help='change loglevel' - ) - - params = vars(parser.parse_args()) - - while len(logging.root.handlers) > 0: - logging.root.removeHandler(logging.root.handlers[-1]) - - logging.basicConfig( - level=shared.LLEVEL[params.get('loglevel')], - format='%(asctime)s - %(levelname)s - %(message)s' - ) - - flickr = FlickrFavs() - flickr.run() - - #hn = HNBookmarks() - #hn.run() - - fivehpx = FivehpxFavs() - fivehpx.run() - - tumblr = TumblrFavs() - tumblr.run() - - da = DAFavs() - da.run()
D
search.py
@@ -1,80 +0,0 @@
-#!/usr/bin/env python3 - -import os -#import sys -#sys.path.append(os.path.dirname(os.path.abspath(__file__))) - -import asyncio -import uvloop -from sanic import Sanic -import sanic.response -from sanic.log import log as logging -from whoosh import index -from whoosh import qparser -import jinja2 -import shared - -def SearchHandler(query, tmpl): - response = sanic.response.text( - "You seem to have forgot to enter what you want to search for. Please try again.", - status=400 - ) - - if not query: - return response - - query = query.replace('+', ' AND ').replace(' -', ' NOT ') - ix = index.open_dir(os.path.abspath(os.path.join( - shared.config.get('target', 'builddir'), - shared.config.get('var', 'searchdb') - ))) - - qp = qparser.MultifieldParser( - ["title", "content"], - schema = shared.schema - ) - - q = qp.parse(query) - r = ix.searcher().search(q, sortedby="weight", limit=100) - logging.info("results for '%s': %i", query, len(r)) - results = {} - for result in r: - if result['url'] in results.keys(): - continue - - res = { - 'title': result['title'], - #'url': result['url'], - 'highlight': result.highlights("content"), - } - if 'img' in result: - res['img'] = result['img'] - results[result['url']] = res - - tvars = { - 'term': query, - 'posts': results, - } - - logging.info("collected %i results to render", len(results)) - response = sanic.response.html(tmpl.render(tvars), status=200) - return response - -if __name__ == '__main__': - asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - app = Sanic() - - - jldr = jinja2.FileSystemLoader( - searchpath=shared.config.get('source', 'templatesdir') - ) - jenv = jinja2.Environment(loader=jldr) - tmpl = jenv.get_template('searchresults.html') - - @app.route("/search", methods=["GET"]) - async def search(request): - query = request.args.get('s') - r = SearchHandler(query, tmpl) - return r - - app.run(host="127.0.0.1", port=8001, debug=True)
D
tagmyloc.py
@@ -1,118 +0,0 @@
-#!/usr/bin/env python3 - -import asyncio -import uvloop -import os - -from sanic import Sanic -import sanic.response -from sanic.log import log as logging -#import jinja2 -import requests -import shared -import json - - -def locationtags_500px(lat, lon, radius=0.5, num=10): - - tags = [] - if not lat or not lon: - return tags - - logging.info("requesting locationtags from 500px for '%s, %s'", lat, lon) - params = { - 'rpp': 100, - 'geo': "%s,%s,%skm" % (lat, lon, radius), - 'consumer_key': shared.config.get('500px', 'api_key'), - 'tags': 1, - } - - r = requests.get('https://api.500px.com/v1/photos/search',params=params) - try: - results = json.loads(r.text) - except Exception as e: - logging.error('failed to load results for 500px request: %s', e) - logging.error('request was: %s', r.url) - return tags, r.status_code - - _temp = {} - for p in results.get('photos', []): - for t in p.get('tags', []): - if not t or not len(t): - continue - - curr = _temp.get(t, 1) - _temp[t] = curr+1 - - for w in sorted(_temp, key=_temp.get, reverse=True): - tags.append(w) - - return tags[:num], 200 - - -def locationtags_flickr(lat, lon, radius=0.5, num=10): - - tags = [] - if not lat or not lon: - return tags - - logging.info("requesting locationtags from Flickr for '%s, %s'", lat, lon) - params = { - 'method': 'flickr.photos.search', - 'api_key': shared.config.get('flickr', 'api_key'), - 'has_geo': 1, - 'lat': lat, - 'lon': lon, - 'radius': radius, - 'extras': ','.join(['tags','machine_tags']), - 'per_page': 500, - 'format': 'json', - 'nojsoncallback': 1 - } - - r = requests.get('https://api.flickr.com/services/rest/',params=params) - try: - results = json.loads(r.text) - #logging.debug("flickr response: %s", results) - except Exception as e: - logging.error('failed to load results for Flickr request: %s', e) - logging.error('request was: %s', r.url) - return tags, r.status_code - - _temp = {} - for p in results.get('photos', {}).get('photo', {}): - for t in p.get('tags', '').split(' '): - if not t or not len(t): - continue - - curr = _temp.get(t, 1) - _temp[t] = curr+1 - - for w in sorted(_temp, key=_temp.get, reverse=True): - tags.append(w) - - return tags[:num], 200 - #return tags - - -def RequestHandler(lat, lon, rad, num=20): - ftags, status = locationtags_flickr(lat, lon, rad, num) - fivehtags, status = locationtags_500px(lat, lon, rad, num) - - return sanic.response.json({ - 'flickr': ftags, - '500px': fivehtags, - }, status=status) - -if __name__ == '__main__': - asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - app = Sanic() - - @app.route("/tagmyloc") - async def search(request, methods=["GET"]): - lat = request.args.get('lat') - lon = request.args.get('lon') - rad = request.args.get('rad') - return RequestHandler(lat, lon, rad) - - app.run(host="127.0.0.1", port=8003, debug=True)
D
update.sh
@@ -1,13 +0,0 @@
-#!/usr/bin/env bash - -lastmodfile="$(find /web/petermolnar.net/petermolnar.net/ -maxdepth 2 -type f -print0 | xargs -0r ls -ltr | grep -E '(content|comments|copy|files|offlinecopies|photos)' | tail -1 | awk '{print $9}')" -lastmod=$(stat -c %Y "$lastmodfile") -lastrunfile="/web/petermolnar.net/petermolnar.net/build/magic.php" -lastrun=$(stat -c %Y "$lastrunfile") - - -if [ "$lastrun" -lt "$lastmod" ]; then - cd /web/petermolnar.net/petermolnar.net/nasg; python3.5 nasg.py --loglevel info -fi - -exit 0
D
webmention.py
@@ -1,209 +0,0 @@
-#!/usr/bin/env python3 - -import asyncio -import uvloop -import os -import hashlib -import json -import urllib.parse -import frontmatter -from sanic import Sanic -import sanic.response -from sanic.log import log as logging -import validators -import arrow -from webmentiontools import urlinfo -import shared -import envelope -import bleach - - -class WebmentionHandler(object): - def __init__ (self, source, target): - self.source = source - self.target = target - self.now = arrow.utcnow().timestamp - logging.info("incoming webmention %s => %s", self.source, self.target) - - self.r = sanic.response.text( - "something went wrong on my side, could you please let me know at hello@petermolnar.eu ?", - status=500 - ) - - def run(self): - if not self._validate(): - return - - self._parse() - if self._save(): - self._notify() - - def _validate(self): - test = { - self.source: '"souce" parameter is an invalid URL', - self.target: '"target" parameter is an invalid URL' - } - for url, emsg in test.items(): - logging.debug("validating URL %s", url) - if not validators.url(url): - self.r = sanic.response.text( - emsg, - status=400 - ) - return False - - logging.debug("checking target domain") - _target = urllib.parse.urlparse(self.target) - _target_domain = '{uri.netloc}'.format(uri=_target) - _mydomains = shared.config.get('site', 'domains').split(" ") - if not _target_domain in _mydomains: - self.r = sanic.response.text( - "'target' is not in the list of allowed domains", - status=400 - ) - return False - - logging.debug("checking selfpings") - _source = urllib.parse.urlparse(self.source) - _source_domain = '{uri.netloc}'.format(uri=_source) - if _source_domain in _mydomains: - self.r = sanic.response.text( - "selfpings are not allowed", - status=400 - ) - return False - - return True - - def _parse(self): - logging.debug("fetching %s", self.source) - self._source = urlinfo.UrlInfo(self.source) - if self._source.error: - self.r = sanic.response.text( - "couldn't fetch 'source' from %s" % (self.source), - status=408 - ) - return False - - if not self._source.linksTo(self.target): - self.r = sanic.response.text( - "'source' (%s) does not link to 'target' (%s)" % ( - self.source, - self.target - ), - status=400 - ) - return False - - logging.debug("fetching %s", self.target) - self._target = urlinfo.UrlInfo(self.target) - if self._target.error: - self.r = sanic.response.text( - "couldn't fetch 'target' from %s" % (self.target), - status=408 - ) - #logging.info("parsed webmention:\n%s\n\n%s", self.meta, self.content) - - def _accepted(self): - self.r = sanic.response.text( - "accepted", - status=202 - ) - - - def _save(self): - target = os.path.join( - shared.config.get('source', 'commentsdir'), - "%s.md" % self.mhash - ) - - if os.path.isfile(target): - with open(target) as f: - doc = frontmatter.loads(f.read()) - else: - doc = frontmatter.loads('') - - if self.content == doc.content: - logging.warning('repinged target, no update needed') - self._accepted() - return False - - doc.metadata = self.meta - doc.content = self.content - if os.path.isfile(target): - logging.warning('updating existing webmention %s', target) - else: - logging.warning('saving incoming webmention to %s', target) - - with open(target, 'wt') as t: - t.write(frontmatter.dumps(doc)) - self._accepted() - return True - - def _notify(self): - text = "\nsource URL\n: %s\n\ntarget URL:\n: %s\n\ndate\n: %s\n\nauthor name:\n: %s\n\nauthor URL:\n: %s\n\nauthor email:\n: %s\n\n---\n\n%s" % ( - self.source, - self.target, - self._meta['date'], - self._meta['author'].get('name', self.source), - self._meta['author'].get('url', self.source), - self._meta['author'].get('email', ''), - self.content - ) - - l = envelope.Letter( - sender=( - shared.config.get('webmention', 'from_name'), - shared.config.get('webmention', 'from_address') - ), - recipient=( - shared.config.get('webmention', 'to_name'), - shared.config.get('webmention', 'to_address') - ), - subject="[webmention] %s" % self.source, - text=text - ) - l.make() - l.send() - - @property - def mhash(self): - return hashlib.sha1(json.dumps(self.meta, sort_keys=True).encode('utf-8')).hexdigest() - - @property - def meta(self): - if hasattr(self, '_meta'): - return self._meta - - self._meta = { - 'author': bleach.clean(self._source.author, tags=[], strip_comments=True, strip=True), - 'type': self._source.relationType, - 'target': self.target, - 'source': self.source, - 'date': arrow.get(self._source.pubDate).format(shared.ARROWISO), - } - - return self._meta - - @property - def content(self): - if hasattr(self, '_content'): - return self._content - - self._content = shared.Pandoc(False).convert(self._source.content) - return self._content - - -if __name__ == '__main__': - asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - app = Sanic() - - @app.route("/webmention", methods=["POST"]) - async def wm(request): - source = request.form.get('source') - target = request.form.get('target') - r = WebmentionHandler(source, target) - r.run() - return r.r - - app.run(host="127.0.0.1", port=8002, debug=True)