您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

结合Flask-restless,Flask-security和常规Python请求

结合Flask-restless,Flask-security和常规Python请求

这是我修改的最小示例:

from flask import Flask, render_template, request, url_for, redirect
from flask.ext.sqlalchemy import sqlAlchemy
from flask.ext.security import Security, sqlAlchemyUserDatastore, \
    UserMixin, RoleMixin, login_required, current_user, logout_user
from flask.ext.restless import APIManager
from flask.ext.restless import ProcessingException
from flask.ext.login import user_logged_in
# JWT imports
from datetime import timedelta
from flask_jwt import JWT, jwt_required

# Create app
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'super-secret'
app.config['sqlALCHEMY_DATABASE_URI'] = 'sqlite://'
# expiration delay for tokens (here is one minute)
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=60)

# Create database connection object
db = sqlAlchemy(app)

# creates the JWT Token authentication  ======================================
jwt = JWT(app)
@jwt.authentication_handler
def authenticate(username, password):
    user = user_datastore.find_user(email=username)
    print '%s vs. %s' % (username, user.email)
    if username == user.email and password == user.password:
        return user
    return None

@jwt.user_handler
def load_user(payload):
    user = user_datastore.find_user(id=payload['user_id'])
    return user

# Define Flask-security models ===============================================
roles_users = db.Table('roles_users',
        db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
        db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))

class Role(db.Model, RoleMixin):
    id = db.Column(db.Integer(), primary_key=True)
    name = db.Column(db.String(80), unique=True)
    description = db.Column(db.String(255))

class User(db.Model, UserMixin):
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(255), unique=True)
    password = db.Column(db.String(255))
    active = db.Column(db.Boolean())
    confirmed_at = db.Column(db.DateTime())
    roles = db.relationship('Role', secondary=roles_users,
                            backref=db.backref('users', lazy='dynamic'))
#Some additional stuff to query over...
class SomeStuff(db.Model):
    __tablename__ = 'somestuff'
    id = db.Column(db.Integer, primary_key=True)
    data1 = db.Column(db.Integer)
    data2 = db.Column(db.String(10))
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
    user = db.relationship(User, lazy='joined', join_depth=1, viewonly=True)
# Setup Flask-Security
user_datastore = sqlAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore)

# Flask-Restless API ==========================================================
@jwt_required()
def auth_func(**kw):
    return True

apimanager = APIManager(app, flask_sqlalchemy_db=db)

apimanager.create_api(SomeStuff,
    methods=['GET', 'POST', 'DELETE', 'PUT'],
    url_prefix='/api/v1',
    collection_name='free_stuff',
    include_columns=['data1', 'data2', 'user_id'])

apimanager.create_api(SomeStuff,
    methods=['GET', 'POST', 'DELETE', 'PUT'],
    url_prefix='/api/v1',
    preprocessors=dict(GET_SINGLE=[auth_func], GET_MANY=[auth_func]),
    collection_name='protected_stuff',
    include_columns=['data1', 'data2', 'user_id'])

# Create some users to test with
@app.before_first_request
def create_user():
    db.create_all()
    user_datastore.create_user(email='test', password='test')
    user_datastore.create_user(email='test2', password='test2')
    ###
    stuff = SomeStuff(data1=2, data2='toto', user_id=1)
    db.session.add(stuff)
    stuff = SomeStuff(data1=5, data2='titi', user_id=1)
    db.session.add(stuff)
    db.session.commit()

# Views
@app.route('/')
@login_required
def home():
    print(request.headers)
    return render_template('index.html')

@app.route('/logout/')
def log_out():
    logout_user()
    return redirect(request.args.get('next') or '/')

if __name__ == '__main__':
    app.run()

然后,与它交互通过 requests

>>>  import requests, json   
>>>  r=requests.get('http://127.0.0.1:5000/api/v1/free_stuff')  # this is OK   
>>>  print 'status:', r.status_code
status: 200   
>>>  r=requests.get('http://127.0.0.1:5000/api/v1/protected_stuff')  # this should fail   
>>>  print 'status:', r.status_code
status: 401   
>>>  print r.json()
{u'status_code': 401, 
u'description': u'Authorization header was missing', 
u'error':    u'Authorization required'}   
>>>  # Authenticate and retrieve Token   
>>>  r = requests.post('http://127.0.0.1:5000/auth', 
...:                   data=json.dumps({'username': 'test', 'password': 'test'}),
...:                   headers={'content-type': 'application/json'}
...:                   )   
>>>  print 'status:', r.status_code
status: 200   
>>>  token = r.json()['token']   
>>>  # Now we have the token, we can navigate to restricted area:   
>>>  r = requests.get('http://127.0.0.1:5000/api/v1/protected_stuff', 
...:                   headers={'Authorization': 'Bearer %s' % token})   
>>>  print 'status:', r.status_code
status: 200 
Python 2022/1/1 18:20:27 有453人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶