Source code for digital_milliet.lib.oauth
from flask_oauthlib.client import OAuth
from flask import redirect, url_for, session, request, jsonify, render_template
from functools import wraps
[docs]class OAuthHelper(object):
""" Helper class providing OAuth2 functionality to the application
Implements flask_oauthlib.client
"""
[docs] def __init__(self, app):
""" Constructor
:param app: the wrapped flask app
:type app: Flask
"""
oauth = OAuth(app)
self.authobj = oauth.remote_app(
app.config['OAUTH_NAME'],
consumer_key=app.config['OAUTH_CONSUMER_KEY'],
consumer_secret=app.config['OAUTH_CONSUMER_SECRET'],
request_token_params=app.config['OAUTH_REQUEST_TOKEN_PARAMS'],
base_url=app.config['OAUTH_BASE_URL'],
request_token_url=app.config['OAUTH_REQUEST_TOKEN_URL'],
access_token_method=app.config['OAUTH_ACCESS_TOKEN_METHOD'],
access_token_url=app.config['OAUTH_ACCESS_TOKEN_URL'],
authorize_url=app.config['OAUTH_AUTHORIZE_URL']
)
self.authobj.tokengetter(self.oauth_token)
self.authcallback = app.config['OAUTH_CALLBACK_URL']
self.enforce_community_id = app.config['ENFORCE_COMMUNITY_ID']
self.auth_override = None
if 'OAUTH_USER_OVERRIDE' in app.config:
self.auth_override = app.config['OAUTH_USER_OVERRIDE']
app.add_url_rule('/oauth/login', view_func=self.r_oauth_login)
app.add_url_rule(
'/oauth/authorized',
view_func=self.r_oauth_authorized)
app.add_url_rule('/oauth/logout', view_func=self.r_oauth_logout)
[docs] def r_oauth_login(self):
""" Route for OAuth2 Login
:param next: next url
:type next: string
:return: Redirects to OAuth Provider Login URL
"""
session['next'] = request.args.get('next', '')
# overrides the oauth functionality for ease of development
# this override should not be used in production
if self.auth_override:
session['oauth_user_uri'] = self.auth_override['oauth_user_uri']
session['oauth_user_name'] = self.auth_override['oauth_user_name']
return redirect(session['next'])
callback_url = self.authcallback
if callback_url is None:
callback_url = url_for('.r_oauth_authorized', _external=True)
return self.authobj.authorize(callback=callback_url)
[docs] def r_oauth_authorized(self):
""" Route for OAuth2 Authorization callback
:return: renders template
"""
resp = self.authobj.authorized_response()
if resp is None:
return 'Access denied: reason=%s error=%s' % (
request.args['error'],
request.args['error_description']
)
session['oauth_token'] = (resp['access_token'], '')
user = self.authobj.get('user')
# TODO this is too specific to Perseids' api model. We should
# externalize.
if not self.user_in_community(user.data['user']['communities']):
return 'Access denied: reason=%s error=%s' % (
"Not in Community",
"Not in Community"
)
session['oauth_user_uri'] = user.data['user']['uri']
session['oauth_user_name'] = user.data['user']['full_name']
if 'next' in session and session['next'] is not None and session['next'] != '':
return redirect(session['next'])
else:
return render_template(
'authorized.html',
username=session['oauth_user_name'])
[docs] @staticmethod
def r_oauth_logout():
""" Route to clear the oauth data from the session
:param next: next url
:type next: string
:return: redirects to next or renders template
"""
session.pop('oauth_user_uri', None)
session.pop('oauth_user_name', None)
next = request.args.get('next', '')
if next is not None and next != '':
return redirect(next)
else:
return render_template('index.html')
[docs] @staticmethod
def oauth_token(token=None):
""" tokengetter function
:param token: the Oauth token
:type token: string
:return: the current access token
:rtype: string
"""
return session.get('oauth_token')
[docs] @staticmethod
def current_user():
""" Gets the current user from the session
:return: { uri => <uri>, name => <name> }
:rtype: dict
"""
if session and session['oauth_user_uri']:
user = {'uri': session['oauth_user_uri']}
if 'oauth_user_name' in session:
user['name'] = session['oauth_user_name']
else:
user['name'] = session['oauth_user_uri']
return user
else:
return None
[docs] @staticmethod
def oauth_required(f):
""" decorator to add to a view to require an oauth user
:return: decorated function
:rtype: func
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'oauth_user_uri' not in session or session['oauth_user_uri'] is None:
if request.method == 'POST':
# we should really handle redirects on POSTS too but they should only happen if the session
# timed out in between requesting a page and submitting it so to keep it simple just redirect
# POSTs to the index page
next = url_for('.index', _external=True)
else:
next = request.url
return redirect(
url_for(
'.r_oauth_login',
next=next,
_external=True))
return f(*args, **kwargs)
return decorated_function