jupyter login

 anaconda2/lib/python2.7/site-packages/notebook/auth/login.py

"""Tornado handlers for logging into the notebook."""

# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import re
import urllib2
import json
import os

try:
    from urllib.parse import urlparse # Py 3
except ImportError:
    from urlparse import urlparse # Py 2
import uuid

from tornado.escape import url_escape

from ..auth.security import passwd_check

from ..base.handlers import IPythonHandler


class LoginHandler(IPythonHandler):
    """The basic tornado login handler

    authenticates with a hashed password from the configuration.
    """
    def _render(self, message=None):
        self.write(self.render_template('login.html',
                next=url_escape(self.get_argument('next', default=self.base_url)),
                message=message,
        ))

    def _redirect_safe(self, url, default=None):
        """Redirect if url is on our PATH

        Full-domain redirects are allowed if they pass our CORS origin checks.

        Otherwise use default (self.base_url if unspecified).
        """
        if default is None:
            default = self.base_url
        if not url.startswith(self.base_url):
            # require that next_url be absolute path within our path
            allow = False
            # OR pass our cross-origin check
            if '://' in url:
                # if full URL, run our cross-origin check:
                parsed = urlparse(url.lower())
                origin = '%s://%s' % (parsed.scheme, parsed.netloc)
                if self.allow_origin:
                    allow = self.allow_origin == origin
                elif self.allow_origin_pat:
                    allow = bool(self.allow_origin_pat.match(origin))
            if not allow:
                # not allowed, use default
                self.log.warning("Not allowing login redirect to %r" % url)
                url = default
        self.redirect(url)

    def get(self):
        if self.current_user:
            next_url = self.get_argument('next', default=self.base_url)
            self._redirect_safe(next_url)
        else:
            self._render()

    @property
    def hashed_password(self):
        return self.password_from_settings(self.settings)

    def passwd_check(self, a, b):
        return passwd_check(a, b)

    #
    # rr
    #
    def sas_token_check(self, t):
        try:
            if not t or t == '':
                return None
            token_url = os.environ['SAS_TOKEN_URL']
            if token_url == '':
                return None
            req = urllib2.Request(url=token_url+t)
            r = json.loads(urllib2.urlopen(req).read())
            if r['err'] != 0:
                return None
            if r['data'] is None or r['data'] == '':
                return None
        except:
            return None

        return r['data']
    
    def post(self):
        typed_password = self.get_argument('password', default=u'')
        typed_sas_token = self.get_argument('sas_token', default=u'')
        if self.get_login_available(self.settings):
            if self.passwd_check(self.hashed_password, typed_password) and self.sas_token_check(typed_sas_token):
                self.set_login_cookie(self, uuid.uuid4().hex)
                # save sas token to cookie
                os.environ['SAS_SUBMIT_USER'] = typed_sas_token
                self.set_cookie_val(self, "sas.submit.user", typed_sas_token)
                # decode sas token
                typed_sas_token = self.sas_token_check(typed_sas_token)
                # save decoded sas token to cookie
                try:
                    os.environ['SAS_SUBMIT_USER_REAL'] =re.match(r'^DSP_(.+)_d+$', typed_sas_token).groups()[0] 
                except:
                    os.environ['SAS_SUBMIT_USER_REAL'] =re.match(r'^DSP_(.+)$', typed_sas_token).groups()[0] 
                self.set_cookie_val(self, "sas.submit.user.real", os.environ['SAS_SUBMIT_USER_REAL'])
            #elif self.token and self.token == typed_password:
            #    self.set_login_cookie(self, uuid.uuid4().hex)
            else:
                self.set_status(401)
                self._render(message={'error': 'Invalid password or SAS Token'})
                return

        next_url = self.get_argument('next', default=self.base_url)
        self._redirect_safe(next_url)

    #
    # rr
    #
    @classmethod
    def set_cookie_val(cls, handler, key, value):
        """Call this on handlers to set the login cookie for success"""
        cookie_options = handler.settings.get('cookie_options', {})
        cookie_options.setdefault('httponly', True)
        # tornado <4.2 has a bug that considers secure==True as soon as
        # 'secure' kwarg is passed to set_secure_cookie
        if handler.settings.get('secure_cookie', handler.request.protocol == 'https'):
            cookie_options.setdefault('secure', True)
        handler.set_secure_cookie(key, value, **cookie_options)

    @classmethod
    def set_login_cookie(cls, handler, user_id=None):
        """Call this on handlers to set the login cookie for success"""
        cookie_options = handler.settings.get('cookie_options', {})
        cookie_options.setdefault('httponly', True)
        # tornado <4.2 has a bug that considers secure==True as soon as
        # 'secure' kwarg is passed to set_secure_cookie
        if handler.settings.get('secure_cookie', handler.request.protocol == 'https'):
            cookie_options.setdefault('secure', True)
        handler.set_secure_cookie(handler.cookie_name, user_id, **cookie_options)
        return user_id

    auth_header_pat = re.compile('tokens+(.+)', re.IGNORECASE)

    @classmethod
    def get_token(cls, handler):
        """Get the user token from a request

        Default:

        - in URL parameters: ?token=<token>
        - in header: Authorization: token <token>
        """

        user_token = handler.get_argument('token', '')
        if not user_token:
            # get it from Authorization header
            m = cls.auth_header_pat.match(handler.request.headers.get('Authorization', ''))
            if m:
                user_token = m.group(1)
        return user_token

    @classmethod
    def should_check_origin(cls, handler):
        """Should the Handler check for CORS origin validation?

        Origin check should be skipped for token-authenticated requests.

        Returns:
        - True, if Handler must check for valid CORS origin.
        - False, if Handler should skip origin check since requests are token-authenticated.
        """
        return not cls.is_token_authenticated(handler)

    @classmethod
    def is_token_authenticated(cls, handler):
        """Returns True if handler has been token authenticated. Otherwise, False.

        Login with a token is used to signal certain things, such as:

        - permit access to REST API
        - xsrf protection
        - skip origin-checks for scripts
        """
        if getattr(handler, '_user_id', None) is None:
            # ensure get_user has been called, so we know if we're token-authenticated
            handler.get_current_user()
        return getattr(handler, '_token_authenticated', False)

    @classmethod
    def get_user(cls, handler):
        """Called by handlers.get_current_user for identifying the current user.

        See tornado.web.RequestHandler.get_current_user for details.
        """
        # Can't call this get_current_user because it will collide when
        # called on LoginHandler itself.
        if getattr(handler, '_user_id', None):
            return handler._user_id
        user_id = cls.get_user_token(handler)
        if user_id is None:
            user_id = handler.get_secure_cookie(handler.cookie_name)
            try:
                os.environ['SAS_SUBMIT_USER'] = handler.get_secure_cookie("sas.submit.user")
                os.environ['SAS_SUBMIT_USER_REAL'] = handler.get_secure_cookie("sas.submit.user.real")
            except:
                pass
        else:
            cls.set_login_cookie(handler, user_id)
            # Record that the current request has been authenticated with a token.
            # Used in is_token_authenticated above.
            handler._token_authenticated = True
        if user_id is None:
            # prevent extra Invalid cookie sig warnings:
            handler.clear_login_cookie()
            if not handler.login_available:
                # Completely insecure! No authentication at all.
                # No need to warn here, though; validate_security will have already done that.
                user_id = 'anonymous'

        # cache value for future retrievals on the same request
        handler._user_id = user_id
        return user_id

    @classmethod
    def get_user_token(cls, handler):
        """Identify the user based on a token in the URL or Authorization header
        
        Returns:
        - uuid if authenticated
        - None if not
        """
        token = handler.token
        if not token:
            return
        # check login token from URL argument or Authorization header
        user_token = cls.get_token(handler)
        one_time_token = handler.one_time_token
        authenticated = False
        if user_token == token:
            # token-authenticated, set the login cookie
            handler.log.debug("Accepting token-authenticated connection from %s", handler.request.remote_ip)
            authenticated = True
        elif one_time_token and user_token == one_time_token:
            # one-time-token-authenticated, only allow this token once
            handler.settings.pop('one_time_token', None)
            handler.log.info("Accepting one-time-token-authenticated connection from %s", handler.request.remote_ip)
            authenticated = True

        if authenticated:
            return uuid.uuid4().hex
        else:
            return None


    @classmethod
    def validate_security(cls, app, ssl_options=None):
        """Check the notebook application's security.

        Show messages, or abort if necessary, based on the security configuration.
        """
        if not app.ip:
            warning = "WARNING: The notebook server is listening on all IP addresses"
            if ssl_options is None:
                app.log.warning(warning + " and not using encryption. This "
                    "is not recommended.")
            if not app.password and not app.token:
                app.log.warning(warning + " and not using authentication. "
                    "This is highly insecure and not recommended.")
        else:
            if not app.password and not app.token:
                app.log.warning(
                    "All authentication is disabled."
                    "  Anyone who can connect to this server will be able to run code.")

    @classmethod
    def password_from_settings(cls, settings):
        """Return the hashed password from the tornado settings.

        If there is no configured password, an empty string will be returned.
        """
        return settings.get('password', u'')

    @classmethod
    def get_login_available(cls, settings):
        """Whether this LoginHandler is needed - and therefore whether the login page should be displayed."""
        return bool(cls.password_from_settings(settings) or settings.get('token'))
原文地址:https://www.cnblogs.com/moonypog/p/11064507.html