3. Tutorial¶
Let’s assume that we want develop a tool to automatically manage the symmetric cryptography. The base idea is to open a file, read its content, encrypt or decrypt the data and then write them out to a new file. This tutorial shows how to:
Note
You can find more examples, on how to use the schedula library, into the folder examples.
3.1. Model definition¶
First of all we start defining an empty Dispatcher
named symmetric_cryptography that defines the dataflow execution model:
>>> import schedula as sh
>>> dsp = sh.Dispatcher(name='symmetric_cryptography')
There are two main ways to get a key, we can either generate a new one or use
one that has previously been generated. Hence, we can define three functions to
simply generate, save, and load the key. To automatically populate the model
inheriting the arguments names, we can use the decorator
add_function()
as follow:
>>> import os.path as osp
>>> from cryptography.fernet import Fernet
>>> @sh.add_function(dsp, outputs=['key'], weight=2)
... def generate_key():
... return Fernet.generate_key().decode()
>>> @sh.add_function(dsp)
... def write_key(key_fpath, key):
... with open(key_fpath, 'w') as f:
... f.write(key)
>>> @sh.add_function(dsp, outputs=['key'], input_domain=osp.isfile)
... def read_key(key_fpath):
... with open(key_fpath) as f:
... return f.read()
Note
Since Python does not come with anything that can encrypt/decrypt files, in
this tutorial, we use a third party module named cryptography
. To install
it execute pip install cryptography
.
To encrypt/decrypt a message, you will need a key as previously defined and your data encrypted or decrypted. Therefore, we can define two functions and add them, as before, to the model:
>>> @sh.add_function(dsp, outputs=['encrypted'])
... def encrypt_message(key, decrypted):
... return Fernet(key.encode()).encrypt(decrypted.encode()).decode()
>>> @sh.add_function(dsp, outputs=['decrypted'])
... def decrypt_message(key, encrypted):
... return Fernet(key.encode()).decrypt(encrypted.encode()).decode()
Finally, to read and write the encrypted or decrypted message, according to the
functional programming philosophy, we can reuse the previously defined functions
read_key
and write_key
changing the model mapping (i.e., function_id,
inputs, and outputs). To add to the model, we can simply use the
add_function
method as follow:
>>> dsp.add_function(
... function_id='read_decrypted',
... function=read_key,
... inputs=['decrypted_fpath'],
... outputs=['decrypted']
... )
'read_decrypted'
>>> dsp.add_function(
... 'read_encrypted', read_key, ['encrypted_fpath'], ['encrypted'],
... input_domain=osp.isfile
... )
'read_encrypted'
>>> dsp.add_function(
... 'write_decrypted', write_key, ['decrypted_fpath', 'decrypted'],
... input_domain=osp.isfile
... )
'write_decrypted'
>>> dsp.add_function(
... 'write_encrypted', write_key, ['encrypted_fpath', 'encrypted']
... )
'write_encrypted'
Note
For more details on how to create a Dispatcher
see: add_data()
,
add_func()
,
add_function()
,
add_dispatcher()
,
SubDispatch
,
SubDispatchFunction
,
SubDispatchPipe
, and
DispatchPipe
.
To inspect and visualize the dataflow execution model, you can simply plot the graph as follow:
>>> dsp.plot()
Tip
You can explore the diagram by clicking on it.
3.2. Dispatching¶
To see the dataflow execution model in action and its workflow to generate a
key, to encrypt a message, and to write the encrypt data, you can simply invoke
dispatch()
or
__call__()
methods of the dsp
:
>>> import tempfile
>>> tempdir = tempfile.mkdtemp()
>>> message = "secret message"
>>> sol = dsp(inputs=dict(
... decrypted=message,
... encrypted_fpath=osp.join(tempdir, 'data.secret'),
... key_fpath=osp.join(tempdir,'key.key')
... ))
>>> sol.plot(index=True) # doctest: +SKIP
Note
As you can see from the workflow graph (orange nodes), when some function’s inputs does not respect its domain, the Dispatcher automatically finds an alternative path to estimate all computable outputs. The same logic applies when there is a function failure.
Now to decrypt the data and verify the message without saving the decrypted
message, you just need to execute again the dsp
changing the inputs and
setting the desired outputs. In this way, the dispatcher automatically
selects and executes only a sub-part of the dataflow execution model.
>>> dsp(
... inputs=sh.selector(('encrypted_fpath', 'key_fpath'), sol),
... outputs=['decrypted']
... )['decrypted'] == message
True
If you want to visualize the latest workflow of the dispatcher, you can use the
plot()
method with the keyword
workflow=True
:
>>> dsp.plot(workflow=True, index=True) # doctest: +SKIP
3.3. Sub-model extraction¶
A good security practice, when design a light web API service, is to avoid the unregulated access to the system’s reading and writing features. Since our current dataflow execution model exposes these functionality, we need to extract sub-model without read/write of key and message functions:
>>> api = dsp.get_sub_dsp((
... 'decrypt_message', 'encrypt_message', 'key', 'encrypted',
... 'decrypted', 'generate_key', sh.START
... ))
Note
For more details how to extract a sub-model see:
shrink_dsp()
,
get_sub_dsp()
,
get_sub_dsp_from_workflow()
,
SubDispatch
,
SubDispatchFunction
,
DispatchPipe
, and
SubDispatchPipe
.
3.4. API server¶
Now that the api
model is secure, we can deploy our web API service.
schedula allows to convert automatically a
Dispatcher
to a web API service using the
web()
method. By default, it exposes the
dispatch()
method of the Dispatcher and
maps all its functions and sub-dispatchers. Each of these APIs are commonly
called endpoints. You can launch the server with the code below:
>>> server = api.web().site(host='127.0.0.1', port=5000).run()
>>> url = server.url; url
'http://127.0.0.1:5000'
Note
When server
object is garbage collected, the server shutdowns
automatically. To force the server shutdown, use its method
server.shutdown()
.
Once the server is running, you can try out the encryption functionality making
a JSON POST request, specifying the args and kwargs of the
dispatch()
method, as follow:
>>> import requests
>>> res = requests.post(
... 'http://127.0.0.1:5000', json={'args': [{'decrypted': 'message'}]}
... ).json()
Note
By default, the server returns a JSON response containing the function
results (i.e., 'return'
) or, in case of server code failure, it returns
the 'error'
message.
To validate the encrypted message, you can directly invoke the decryption function as follow:
>>> res = requests.post(
... '%s/symmetric_cryptography/decrypt_message?data=input,return' % url,
... json={'kwargs': sh.selector(('key', 'encrypted'), res['return'])}
... ).json(); sorted(res)
['input', 'return']
>>> res['return'] == 'message'
True
Note
The available endpoints are formatted like:
/
or/{dsp_name}
: calls thedispatch()
method,/{dsp_name}/{function_id}
: invokes the relative function.
There is an optional query param data=input,return
, to include the
inputs into the server JSON response and exclude the possible error message.