# Stack Service - Dask

## Authentication via OIDC password grant flow
The DEDLAuth Class is a helper class to authenticate a user against the given identity provider.
The users password is directly handed over to the request object and is not stored.
Refreshed token is used request a new access token in case it is expired.

In [None]:
from dask_gateway.auth import GatewayAuth
from getpass import getpass
import requests
import jwt
from jwt import PyJWKClient
from datetime import datetime

class DEDLAuth(GatewayAuth):
    
    def __init__(self, username):
        self.username = username
        self.client_id = "dedl-stack-public-client"
        self.token_url = "https://identity.data.destination-earth.eu/auth/realms/dedl/protocol/openid-connect/token"
        self.cert_url = "https://identity.data.destination-earth.eu/auth/realms/dedl/protocol/openid-connect/certs"
        self.token = self.get_token()
        self.access_token_decoded = self.decode_access_token()
    
    def get_token(self):
        payload = {
            "grant_type": "password",
            "client_id": self.client_id,
            "username": self.username,
            "password": getpass(prompt="Your DEDL Password:")
        }
        return requests.post(self.token_url, data=payload).json()
    
    def decode_access_token(self):
        jwks_client = PyJWKClient(self.cert_url)
        signing_key = jwks_client.get_signing_key_from_jwt(self.token["access_token"])
        return jwt.decode(self.token["access_token"], signing_key.key, algorithms=["RS256"])
    
    def token_expired(self):
        if datetime.now() > datetime.fromtimestamp(self.access_token_decoded["exp"]):
            return True
        else:
            return False
    
    def refresh_token_exchange(self):
        payload = {
            "grant_type": "refresh_token",
            "client_id": "dedl-dask-gateway",
            "refresh_token": self.token["refresh_token"],
        }
        return requests.post(self.token_url, data=payload).json()
    
    def refresh(self):
        self.token = self.refresh_token_exchange()
        self.access_token_decoded = self.decode_access_token()
        
        
    def pre_request(self, resp):
        if self.token_expired():
            self.refresh()
        headers = {"Authorization": "Bearer " + self.token["access_token"]}
        return headers, None

## Access to Dask across multiple locations

Dask Gatway is deployed on each location within the DEDL.
The DaskMultiCluster Class provides an interface to communicate with all Dask Gateway instances at once.
The API is kept as close as possible to the API of Dask Gateway itself.

The class encompass a registry object (dict) holding all needed information to connect to the different Dask Gateway instances.
Individual objectes per location can be access/retrieved via ```DaskMultiCluster['name']```. This can be useful in case one would like to interact with a single location only.

Connection details per DEDL location are as follows:

- Central Site
    - Address: **http://dask.central.data.destination-earth.eu**, HTTP API endpoints of Dask Gateway
    - Proxy Address: **tcp://dask.central.data.destination-earth.eu:80**, TCP communication with actual cluster

- LUMI Bridge
    - Address: **http://dask.lumi.data.destination-earth.eu**, HTTP API endpoints of Dask Gateway
    - Proxy Address: **tcp://dask.lumi.data.destination-earth.eu:80**, TCP communication with actual cluster

Further details on how to use these connection details can be found below.

Each cluster will be composed of 2 workers per default, with adaptive scaling enabled towards a maximum of 10 workers. In addition, the workers are configured to have 2 cores and 4 GB RAM. Users are allow to change this via the [cluster options](https://gateway.dask.org/cluster-options.html) exposed in the range of:
- Worker cores:
    - min: 1
    - max: 10
- Worker memory:
    - min: 1 GB
    - max: 32 GB
    
Dask Worker and Scheduler nodes are based on a custom build [container image](registry.eodc.eu/eodc/clusters/dedl-deployment/dedl-dask) with the aim to match the environment, Jupyter Kernel, of the DEDL JupyterLab instance. Warnings will be displayed if a version missmatch is detected. Feel free to use your custom image to run your workloads by replacing the container image in the cluster options object.


In [None]:
from dask_gateway import Gateway
from distributed import Client

class DaskMultiCluster:
       
    gateway_registry = {
        "central": {
            "name": "Central Site",
            "address": "http://dask.central.data.destination-earth.eu",
            "proxy_address": "tcp://dask.central.data.destination-earth.eu:80",
            "default_config": {
                "min": 2,
                "max": 10
            }
        },
        "lumi": {
            "name": "LUMI Bridge",
            "address": "http://dask.lumi.data.destination-earth.eu",
            "proxy_address": "tcp://dask.lumi.data.destination-earth.eu:80",
            "default_config": {
                "min": 2,
                "max": 10
            }
        }
    }
    
    gateway = {}
    cluster = {}
    client = {}

    def __init__(self, auth):
        self.authenticator = auth
        for site in self.gateway_registry:
            # connect to gateway
            self.gateway[site] = Gateway(
                address=self.gateway_registry[site]["address"],
                proxy_address=self.gateway_registry[site]["proxy_address"],
                auth=self.authenticator,
            )
    
    def print_registry(self):
        pprint.pprint(self.gateway_registry)

    def get_gateways(self) -> None:
        for site in self.gateway_registry:
            print(f"{site}: {self.gateway_registry[site]}")

    def new_cluster(self, *args, **kwargs) -> None:
        for site in self.gateway_registry:
            # get new cluster object
            print(f"Create new cluster for {self.gateway_registry[site]['name']}")
            self.cluster[site] = self.gateway[site].new_cluster(*args, **kwargs)
            self.cluster[site].adapt(minimum=self.gateway_registry[site]["default_config"]["min"],
                                     maximum=self.gateway_registry[site]["default_config"]["max"])
            self.client[site] = self.cluster[site].get_client(set_as_default=False)
    
    def compute(self, data, location_key="location", **kwargs):
        return self.client[data.attrs[location_key]].compute(data, **kwargs)
    
    def get_cluster_url(self):
        for site in self.gateway_registry:
            print(self.cluster[site].dashboard_link)

    def shutdown(self):
        for site in self.gateway_registry:
            self.cluster[site].close()

## Connect to all locations known by DaskMultiCluster
Authentication object needs to be forwarded in order to get access. Provide your DEDL username to authenticate with the DEDL Authenticator.

In [None]:
username=input()
authenticator = DEDLAuth(username=username)

In [None]:
dedl_dask = DaskMultiCluster(auth=authenticator)

In [None]:
dedl_dask.new_cluster()

#### scale individual cluster

In [None]:
dedl_dask.cluster['central']

### Get dashboard links to get more insights

In [None]:
dedl_dask.get_cluster_url()

### Shutdown the cluster and free up all resources

In [None]:
dedl_dask.shutdown()