Source code for schedula.utils.web

# coding=utf-8
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2024, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It provides functions to build a flask app from a dispatcher.
"""
import gzip
import base64
import logging
import functools
from ..exc import WebResponse
from ..drw import SiteMap, SiteFolder, FolderNode, SiteNode
from ..base import Base

__author__ = 'Vincenzo Arcidiacono <vinci1it2000@gmail.com>'

log = logging.getLogger(__name__)


[docs] class FolderNodeWeb(FolderNode): node_data = () # ('+set_value',) node_function = ('+function',) # ('+input_domain',) edge_data = () def _set_value(self): if self.type == 'data' and self.node_id in self.folder.dsp.nodes: set_value = self.folder.dsp.set_default_value yield '', functools.partial(set_value, self.node_id) def _function(self): name = 'function' if name in self.attr: yield '' if self.type == 'function' else name, self.attr[name]
[docs] class WebFolder(SiteFolder): folder_node = FolderNodeWeb ext = ''
[docs] class WebNode(SiteNode): ext = '' @property def name(self): return self.node_id
[docs] class WebMap(SiteMap): _view = site_index = lambda *args, **kwargs: None site_folder = WebFolder site_node = WebNode include_folders_as_filenames = False methods = ['POST'] subsite_methods = ['GET', 'POST'] idle_timeout = 600 def _repr_svg_(self): raise NotImplementedError()
[docs] def basic_app(self, root_path, mute=True, blueprint_name=None, **kwargs): app = super(WebMap, self).basic_app( root_path, mute=mute, blueprint_name=blueprint_name, **kwargs ) app.before_request(self.before_request) return app
[docs] def app(self, root_path=None, depth=-1, mute=False, blueprint_name=None, **kwargs): kwargs.pop('index', None) app = self.basic_app( root_path, mute=mute, blueprint_name=blueprint_name, **kwargs ) app.after_request(self.after_request) context = self.rules(depth=depth, index=False) opt = {'methods': self.methods} for i, ((node, extra), path) in enumerate(context.items()): view = functools.partial(self._func_handler, node.obj) if i: app.add_url_rule('/%s' % path, f'api_{path}', view, **opt) else: app.add_url_rule('/', 'api', view, **opt) app.add_url_rule('/%s' % path, 'api', **opt) opt = {'methods': self.subsite_methods} app.add_url_rule('/subsite/<key>/', 'subsite', self._site_proxy, **opt) app.add_url_rule('/subsite/<key>/<path:path>', 'subsite', **opt) app.add_url_rule('/subsite/<key>/<string:path>', 'subsite', **opt) return app
[docs] def render(self, *args, **kwargs): raise NotImplementedError()
[docs] def init_debug_subsite(self, func): import random from flask import url_for from string import ascii_lowercase, digits key = ''.join(random.choices(ascii_lowercase + digits, k=12)) upx = url_for('subsite', key=key) site = func.plot(workflow=True, view=False).site( index=True, url_prefix=upx, blueprint_name='debug', idle_timeout=self.idle_timeout ).run() self.subsites[key] = (site.url, site.shutdown) return upx
[docs] @staticmethod def before_request(): from flask import request if request.headers.get('Content-Encoding') == 'gzip': request.stream = gzip.GzipFile(fileobj=request.stream)
[docs] @staticmethod def after_request(response): from flask import request, current_app, get_flashed_messages messages = get_flashed_messages(with_categories=True) if messages: headers = {} messages = current_app.json.dumps(messages).encode('utf8') if 'gzip' in request.headers.get('Accept-Encoding', '').lower(): messages = base64.b64encode(gzip.compress(messages)) headers['X-Flash-Messages-length'] = len(messages) headers['X-Flash-Messages-Encoding'] = 'gzip' headers['X-Flash-Messages'] = messages response.headers.update(headers) return response
def _func_handler(self, func): from ..dsp import selector from flask import request, current_app, Response resp = None data = {} try: if not (request.is_json or request.get_data()): inp = {} else: inp = request.get_json(force=True) data['input'] = inp data['return'] = func(*inp.get('args', ()), **inp.get('kwargs', {})) if isinstance(data['return'], Response): resp = data['return'] except WebResponse as ex: resp = ex.response except Exception as ex: data['error'] = str(ex) headers = {'Content-Type': 'application/json'} if resp is None: keys = request.args.get('data', 'return,error').split(',') keys = [v.strip(' ') for v in keys] content = current_app.json.dumps(selector( keys, data, allow_miss=True )).encode('utf8') if 'gzip' in request.headers.get('Accept-Encoding', '').lower(): content = gzip.compress(content) headers['Content-length'] = len(content) headers['Content-Encoding'] = 'gzip' resp = current_app.make_response(content) if request.headers.get('Debug') == 'true' and isinstance(func, Base): headers['Debug-Location'] = self.init_debug_subsite(func) resp.headers.update(headers) return resp def _site_proxy(self, key, path=''): import requests from flask import request, current_app, abort key = key.lower() host_url = self.subsites.get(key, (None,))[0] if not host_url: return abort(404) try: resp = requests.request( method=request.method, url=f"{'/'.join((host_url, path or ''))}", headers={k: v for k, v in request.headers if k != 'Host'}, data=request.get_data(), cookies=request.cookies, allow_redirects=False ) except requests.ConnectionError: self.subsites.pop(key, None) return abort(503) excluded_headers = [ 'content-encoding', 'content-length', 'transfer-encoding', 'connection' ] headers = [ (k, v) for k, v in resp.raw.headers.items() if k.lower() not in excluded_headers ] return current_app.response_class( resp.content, resp.status_code, headers )