mirror of
https://github.com/coleifer/peewee.git
synced 2026-05-06 07:56:41 -04:00
b3d43677cd
[skip ci]
271 lines
9.4 KiB
Python
271 lines
9.4 KiB
Python
import datetime
|
|
import functools
|
|
import os
|
|
import re
|
|
import urllib
|
|
|
|
from flask import (Flask, flash, redirect, render_template, request,
|
|
Response, session, url_for)
|
|
from markdown import markdown
|
|
from markdown.extensions.codehilite import CodeHiliteExtension
|
|
from markdown.extensions.extra import ExtraExtension
|
|
from markupsafe import Markup
|
|
from micawber import bootstrap_basic, parse_html
|
|
from micawber.cache import Cache as OEmbedCache
|
|
from peewee import *
|
|
from playhouse.flask_utils import FlaskDB, get_object_or_404, object_list
|
|
from playhouse.sqlite_ext import *
|
|
|
|
|
|
# Blog configuration values.
|
|
|
|
# You may consider using a one-way hash to generate the password, and then
|
|
# use the hash again in the login view to perform the comparison. This is just
|
|
# for simplicity.
|
|
ADMIN_PASSWORD = 'secret'
|
|
APP_DIR = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
# The playhouse.flask_utils.FlaskDB object accepts database URL configuration.
|
|
DATABASE = 'sqlite:///%s?rank_functions=1' % os.path.join(APP_DIR, 'blog.db')
|
|
DEBUG = False
|
|
|
|
# The secret key is used internally by Flask to encrypt session data stored
|
|
# in cookies. Make this unique for your app.
|
|
SECRET_KEY = 'shhh, secret!'
|
|
|
|
# This is used by micawber, which will attempt to generate rich media
|
|
# embedded objects with maxwidth=800.
|
|
SITE_WIDTH = 800
|
|
|
|
|
|
# Create a Flask WSGI app and configure it using values from the module.
|
|
app = Flask(__name__)
|
|
app.config.from_object(__name__)
|
|
|
|
# FlaskDB is a wrapper for a peewee database that sets up pre/post-request
|
|
# hooks for managing database connections.
|
|
flask_db = FlaskDB(app)
|
|
|
|
# The `database` is the actual peewee database, as opposed to flask_db which is
|
|
# the wrapper.
|
|
database = flask_db.database
|
|
|
|
# Configure micawber with the default OEmbed providers (YouTube, Flickr, etc).
|
|
# We'll use a simple in-memory cache so that multiple requests for the same
|
|
# video don't require multiple network requests.
|
|
oembed_providers = bootstrap_basic(OEmbedCache())
|
|
|
|
|
|
class Entry(flask_db.Model):
|
|
title = CharField()
|
|
slug = CharField(unique=True)
|
|
content = TextField()
|
|
published = BooleanField(index=True)
|
|
timestamp = DateTimeField(default=datetime.datetime.now, index=True)
|
|
|
|
@property
|
|
def html_content(self):
|
|
"""
|
|
Generate HTML representation of the markdown-formatted blog entry,
|
|
and also convert any media URLs into rich media objects such as video
|
|
players or images.
|
|
"""
|
|
hilite = CodeHiliteExtension(linenums=False, css_class='highlight')
|
|
extras = ExtraExtension()
|
|
markdown_content = markdown(self.content, extensions=[hilite, extras])
|
|
oembed_content = parse_html(
|
|
markdown_content,
|
|
oembed_providers,
|
|
urlize_all=True,
|
|
maxwidth=app.config['SITE_WIDTH'])
|
|
return Markup(oembed_content)
|
|
|
|
def save(self, *args, **kwargs):
|
|
# Generate a URL-friendly representation of the entry's title.
|
|
if not self.slug:
|
|
self.slug = re.sub(r'[^\w]+', '-', self.title.lower()).strip('-')
|
|
ret = super(Entry, self).save(*args, **kwargs)
|
|
|
|
# Store search content.
|
|
self.update_search_index()
|
|
return ret
|
|
|
|
def update_search_index(self):
|
|
# Create a row in the FTSEntry table with the post content. This will
|
|
# allow us to use SQLite's awesome full-text search extension to
|
|
# search our entries.
|
|
exists = (FTSEntry
|
|
.select(FTSEntry.docid)
|
|
.where(FTSEntry.docid == self.id)
|
|
.exists())
|
|
content = '\n'.join((self.title, self.content))
|
|
if exists:
|
|
(FTSEntry
|
|
.update({FTSEntry.content: content})
|
|
.where(FTSEntry.docid == self.id)
|
|
.execute())
|
|
else:
|
|
FTSEntry.insert({
|
|
FTSEntry.docid: self.id,
|
|
FTSEntry.content: content}).execute()
|
|
|
|
@classmethod
|
|
def public(cls):
|
|
return Entry.select().where(Entry.published == True)
|
|
|
|
@classmethod
|
|
def drafts(cls):
|
|
return Entry.select().where(Entry.published == False)
|
|
|
|
@classmethod
|
|
def search(cls, query):
|
|
words = [word.strip() for word in query.split() if word.strip()]
|
|
if not words:
|
|
# Return an empty query.
|
|
return Entry.noop()
|
|
else:
|
|
search = ' '.join(words)
|
|
|
|
# Query the full-text search index for entries matching the given
|
|
# search query, then join the actual Entry data on the matching
|
|
# search result.
|
|
return (Entry
|
|
.select(Entry, FTSEntry.rank().alias('score'))
|
|
.join(FTSEntry, on=(Entry.id == FTSEntry.docid))
|
|
.where(
|
|
FTSEntry.match(search) &
|
|
(Entry.published == True))
|
|
.order_by(SQL('score')))
|
|
|
|
class FTSEntry(FTSModel):
|
|
content = TextField()
|
|
|
|
class Meta:
|
|
database = database
|
|
|
|
def login_required(fn):
|
|
@functools.wraps(fn)
|
|
def inner(*args, **kwargs):
|
|
if session.get('logged_in'):
|
|
return fn(*args, **kwargs)
|
|
return redirect(url_for('login', next=request.path))
|
|
return inner
|
|
|
|
@app.route('/login/', methods=['GET', 'POST'])
|
|
def login():
|
|
next_url = request.args.get('next') or request.form.get('next')
|
|
if request.method == 'POST' and request.form.get('password'):
|
|
password = request.form.get('password')
|
|
# TODO: If using a one-way hash, you would also hash the user-submitted
|
|
# password and do the comparison on the hashed versions.
|
|
if password == app.config['ADMIN_PASSWORD']:
|
|
session['logged_in'] = True
|
|
session.permanent = True # Use cookie to store session.
|
|
flash('You are now logged in.', 'success')
|
|
return redirect(next_url or url_for('index'))
|
|
else:
|
|
flash('Incorrect password.', 'danger')
|
|
return render_template('login.html', next_url=next_url)
|
|
|
|
@app.route('/logout/', methods=['GET', 'POST'])
|
|
def logout():
|
|
if request.method == 'POST':
|
|
session.clear()
|
|
return redirect(url_for('login'))
|
|
return render_template('logout.html')
|
|
|
|
@app.route('/')
|
|
def index():
|
|
search_query = request.args.get('q')
|
|
if search_query:
|
|
query = Entry.search(search_query)
|
|
else:
|
|
query = Entry.public().order_by(Entry.timestamp.desc())
|
|
|
|
# The `object_list` helper will take a base query and then handle
|
|
# paginating the results if there are more than 20. For more info see
|
|
# the docs:
|
|
# http://docs.peewee-orm.com/en/latest/peewee/playhouse.html#object_list
|
|
return object_list(
|
|
'index.html',
|
|
query,
|
|
search=search_query,
|
|
check_bounds=False)
|
|
|
|
def _create_or_edit(entry, template):
|
|
if request.method == 'POST':
|
|
entry.title = request.form.get('title') or ''
|
|
entry.content = request.form.get('content') or ''
|
|
entry.published = request.form.get('published') or False
|
|
if not (entry.title and entry.content):
|
|
flash('Title and Content are required.', 'danger')
|
|
else:
|
|
# Wrap the call to save in a transaction so we can roll it back
|
|
# cleanly in the event of an integrity error.
|
|
try:
|
|
with database.atomic():
|
|
entry.save()
|
|
except IntegrityError:
|
|
flash('Error: this title is already in use.', 'danger')
|
|
else:
|
|
flash('Entry saved successfully.', 'success')
|
|
if entry.published:
|
|
return redirect(url_for('detail', slug=entry.slug))
|
|
else:
|
|
return redirect(url_for('edit', slug=entry.slug))
|
|
|
|
return render_template(template, entry=entry)
|
|
|
|
@app.route('/create/', methods=['GET', 'POST'])
|
|
@login_required
|
|
def create():
|
|
return _create_or_edit(Entry(title='', content=''), 'create.html')
|
|
|
|
@app.route('/drafts/')
|
|
@login_required
|
|
def drafts():
|
|
query = Entry.drafts().order_by(Entry.timestamp.desc())
|
|
return object_list('index.html', query, check_bounds=False)
|
|
|
|
@app.route('/<slug>/')
|
|
def detail(slug):
|
|
if session.get('logged_in'):
|
|
query = Entry.select()
|
|
else:
|
|
query = Entry.public()
|
|
entry = get_object_or_404(query, Entry.slug == slug)
|
|
return render_template('detail.html', entry=entry)
|
|
|
|
@app.route('/<slug>/edit/', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit(slug):
|
|
entry = get_object_or_404(Entry, Entry.slug == slug)
|
|
return _create_or_edit(entry, 'edit.html')
|
|
|
|
@app.template_filter('clean_querystring')
|
|
def clean_querystring(request_args, *keys_to_remove, **new_values):
|
|
# We'll use this template filter in the pagination include. This filter
|
|
# will take the current URL and allow us to preserve the arguments in the
|
|
# querystring while replacing any that we need to overwrite. For instance
|
|
# if your URL is /?q=search+query&page=2 and we want to preserve the search
|
|
# term but make a link to page 3, this filter will allow us to do that.
|
|
querystring = dict((key, value) for key, value in request_args.items())
|
|
for key in keys_to_remove:
|
|
querystring.pop(key, None)
|
|
querystring.update(new_values)
|
|
return urllib.urlencode(querystring)
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(exc):
|
|
return Response('<h3>Not found</h3>'), 404
|
|
|
|
def main():
|
|
database.create_tables([Entry, FTSEntry], safe=True)
|
|
app.run(debug=True)
|
|
|
|
if __name__ == '__main__':
|
|
print('To login, open:')
|
|
print('http://127.0.0.1:5000/login/')
|
|
print('password is: %s' % ADMIN_PASSWORD)
|
|
main()
|