Source code for schedula.utils.form.gapp

# coding=utf-8
# -*- coding: UTF-8 -*-
#
# Copyright 2015-2024, Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It provides the `gunicorn` BaseApplication to run the server in production.
"""
import sys
import runpy
import functools
import os.path as osp
import multiprocessing
from gunicorn.app.base import BaseApplication


@functools.lru_cache()
def _get_module(module, path):
    return runpy.run_module(module)


[docs] def get_module(module=None, extra_sys_paths=()): gbl = {} if module: path = sys.path.copy() if extra_sys_paths else sys.path try: for d in extra_sys_paths[::-1]: sys.path.insert(0, osp.abspath(d)) cpath = tuple(sys.path) for v in ([module] if isinstance(module, str) else module): gbl.update(_get_module(v, cpath)) finally: sys.path = path return gbl
[docs] class Application(BaseApplication):
[docs] def __init__(self, app, workers=None, timeout=0, threads=10, accesslog='-', **options): self.options = { 'threads': threads, 'timeout': timeout, 'workers': workers or (multiprocessing.cpu_count() * 2) + 1, 'accesslog': accesslog, **options } self.application = app super().__init__()
[docs] def load_config(self): invalid = {key for key in self.options if key not in self.cfg.settings} if invalid: raise ValueError(f'Invalid gunicorn options `{", ".join(invalid)}`.') config = { key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None } for key, value in config.items(): self.cfg.set(key.lower(), value)
[docs] def load(self): return self.application