# Hook Service

We begin by importing dependencies needed to run the script

In [None]:
# !conda install -y -c conda-forge pycurl
import json
from io import BytesIO
from urllib.parse import urlencode
import getpass
import pycurl
import requests
from IPython.display import JSON       

To identify this run of notebook we will add a string which will be added to all order names

In [None]:
test_run_id = "060"

Next we setup some variables to be used later, here we suply the auth url, service url, login data as well as the name of the workflow we want to use and list of products to process:
- identity url
- user credentials

In [None]:
# identity (keycloak) url
auth_url = "https://identity.data.destination-earth.eu/auth/realms/dedl/protocol/openid-connect/token"
# Hook (OnDemand Processing) service url (ending with odata/v1/ - e.g. https://odp.data.destination-earth.eu/odata/v1/)
service_root_url = "https://odp.data.destination-earth.eu/odata/v1/"
# user credentials
print('Provide credentials for the Hook API')
username = input('DEDL Username: ')
password = getpass.getpass(prompt='DEDL Password: ', stream=None) 
login_data = {
                "username": username,
                "password": password,
                "client_id": "ordering",
                "grant_type": "password"}

Now we get the jwt access token and save it to a variable

In [None]:
c = pycurl.Curl()
buffer = BytesIO()
c.setopt(c.URL, auth_url)
post_fields = urlencode(login_data)
c.setopt(c.POSTFIELDS, post_fields)
c.setopt(c.WRITEDATA, buffer)
c.perform()
c.close()
token = json.loads(buffer.getvalue().decode())["access_token"]
api_headers = {'Authorization': 'Bearer ' + token}

# Basic interaction with the API
Next we can check what possible workflows are available to us by using method   
```https://odp.data.destination-earth.eu/odata/v1/Workflows```

In [None]:
result = requests.get(service_root_url+"Workflows", headers=api_headers).json()
available_workflows = json.dumps(result,indent=2)
JSON(result)

If we want to see the details of a specific workflow, for example aditional parameters that can be changed we can do that too   
```https://odp.data.destination-earth.eu/odata/v1/Workflows?$expand=WorkflowOptions&$filter=(Name eq lai)```   
**\\$expand=WorkflowOptions** shows all parameters accepted by workflow   
**\\$filter=(Name eq lai)** narrows the result to workflow called "lai" (processor computing Leaf Area Index)

In [None]:
processor_name_query = "'data-harvest'"
result = requests.get(service_root_url
                                  + "Workflows?$expand=WorkflowOptions&$filter=(Name eq "+processor_name_query+")",
                                  headers=api_headers).json()
workflow_details = json.dumps(result, indent=2)
JSON(result)

In [None]:
processor_name_query = "'lai'"
result = requests.get(service_root_url
                                  + "Workflows?$expand=WorkflowOptions&$filter=(Name eq "+processor_name_query+")",
                                  headers=api_headers).json()
workflow_details = json.dumps(result, indent=2)
JSON(result)

In [None]:
processor_name_query = "'card_bs_private'"
result = requests.get(service_root_url
                                  + "Workflows?$expand=WorkflowOptions&$filter=(Name eq "+processor_name_query+")",
                                  headers=api_headers).json()
workflow_details = json.dumps(result, indent=2)
JSON(result)

In [None]:
processor_name_query = "'card_cohinf'"
result = requests.get(service_root_url
                                  + "Workflows?$expand=WorkflowOptions&$filter=(Name eq "+processor_name_query+")",
                                  headers=api_headers).json()
workflow_details = json.dumps(result, indent=2)
JSON(result)

In [None]:
processor_name_query = "'copdem'"
result = requests.get(service_root_url
                                  + "Workflows?$expand=WorkflowOptions&$filter=(Name eq "+processor_name_query+")",
                                  headers=api_headers).json()
workflow_details = json.dumps(result, indent=2)
JSON(result)

In [None]:

processor_name_query = "'maja_private'"
result = requests.get(service_root_url
                                  + "Workflows?$expand=WorkflowOptions&$filter=(Name eq "+processor_name_query+")",
                                  headers=api_headers).json()
workflow_details = json.dumps(result, indent=2)
JSON(result)

In [None]:
processor_name_query = "'c2rcc'"
result = requests.get(service_root_url
                                  + "Workflows?$expand=WorkflowOptions&$filter=(Name eq "+processor_name_query+")",
                                  headers=api_headers).json()
workflow_details = json.dumps(result, indent=2)
JSON(result)

In [None]:


processor_name_query = "'sen2cor_private'"
result = requests.get(service_root_url
                                  + "Workflows?$expand=WorkflowOptions&$filter=(Name eq "+processor_name_query+")",
                                  headers=api_headers).json()
workflow_details = json.dumps(result, indent=2)
JSON(result)

   
Now we can order couple of products (we're ordering multiple products one by one to show the priority later, usually ordering multiple products can be done in a single call by using endpoint   
```https://odp.data.destination-earth.eu/odata/v1/BatchOrder/OData.CSC.Order```   
For the purpose of the test, we are getting data from remote repository - CREODIAS, we need to use CREODIAS credentials to get the input product   

In [None]:
# input catalogue - CloudFerro / CREODIAS
# URL of the OData API
input_catalogue = "https://datahub.creodias.eu"
# URL of the EO data S3 endpoint
input_storage_url = "https://eodata.cloudferro.com"
# CREODIAS EO data S3 credentials (https://eodata-keymanager.creodias.eu/ - active CREODIAS account is required)
input_storage_access_key = "your-access-key"
input_storage_secret_key = "your-secret-key"

In [None]:
# output storage - DEDL Central Site - Islet service
# URL of the S3 endpoint in the Central Site cloud
output_storage_url = "https://s3.central.data.destination-earth.eu"
# name of the object storage bucket where the results will be stored
output_bucket = "your-bucket-name"
# Islet object storage credentials (openstack ec2 credentials)
output_storage_access_key = "your-access-key"
output_storage_secret_key = "your-secret-key"
output_prefix = "dedl_" + test_run_id

### DATA HARVEST

Make an order to harvest data from the Destination Earth Data Lake - Harmonised Data Access service. 
Output will be saved to the user's S3 (within Islet storage service or any S3 compatible storage when the output_storage is set to PRIVATE   
```{"Name": "output_storage", "Value": "PRIVATE"}```

In [None]:
# STAC INPUT AVAILABLE : 

processor_name = "data-harvest"
identifier = "S2A_MSIL2A_20180124T092251_N0213_R093_T35TLG_20210214T010009.SAFE"
order_body_custom_bucket = {
        "Name": "DEDL - Hook presentation - custom bucket - " + processor_name + " - " + test_run_id,
        "WorkflowName": processor_name,
        "IdentifierList": [identifier],
        "WorkflowOptions":[
            {"Name":"s3_bucket", "Value": output_bucket},
            {"Name":"s3_access_key", "Value": output_storage_access_key},
            {"Name":"s3_secret_key", "Value": output_storage_secret_key},
            {"Name":"s3_prefix", "Value": output_prefix},
            {"Name":"s3_endpoint_url", "Value": output_storage_url},
            {"Name": "input_catalogue_type", "Value": "STAC"},
            {"Name": "input_catalogue_url", "Value": "https://hda.data.destination-earth.eu/stac"},
            {"Name": "input_catalogue_collection", "Value": "EO.ESA.DAT.SENTINEL-2.MSI.L2A"},
            {"Name": "source_client_id", "Value": "hda-public"},
            {"Name": "source_client_secret", "Value": ""},
            {"Name": "source_username", "Value": username},
            {"Name": "source_password", "Value": password},
            {"Name": "source_realm", "Value": "dedl"},
            {"Name": "source_server_url", "Value": "https://identity.data.destination-earth.eu/auth"}    
        ]
    }
request = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_custom_bucket),headers=api_headers)
JSON(request.json(), indent=2)

### LAI - Sentinel-2 Leaf Area Index workflow

Make an order for biophysical analysis (LAI)   
Output will be saved to the user's S3 (within Islet storage service or any S3 compatible storage when the output_storage is set to PRIVATE   
```{"Name": "output_storage", "Value": "PRIVATE"}```

In [None]:
# STAC INPUT AVAILABLE : FAILED

processor_name = "lai"
identifier = "S2A_MSIL2A_20230906T102601_N0509_R108_T32UMA_20230906T164400"
order_body_custom_bucket = {
        "Name": "DEDL - Hook presentation - custom bucket - " + processor_name + " - " + test_run_id,
        "WorkflowName": processor_name,
        "IdentifierList": [identifier],
        "WorkflowOptions":[
            {"Name":"s3_bucket", "Value": output_bucket},
            {"Name":"s3_access_key", "Value": output_storage_access_key},
            {"Name":"s3_secret_key", "Value": output_storage_secret_key},
            {"Name":"s3_prefix", "Value": output_prefix},
            {"Name":"s3_endpoint_url", "Value": output_storage_url},
            {"Name": "input_catalogue_type", "Value": "STAC"},
            {"Name": "input_catalogue_url", "Value": "https://hda.data.destination-earth.eu/stac"},
            {"Name": "input_catalogue_collection", "Value": "EO.ESA.DAT.SENTINEL-2.MSI.L2A"},
            {"Name": "source_client_id", "Value": "hda-public"},
            {"Name": "source_client_secret", "Value": ""},
            {"Name": "source_username", "Value": username},
            {"Name": "source_password", "Value": password},
            {"Name": "source_realm", "Value": "dedl"},
            {"Name": "source_server_url", "Value": "https://identity.data.destination-earth.eu/auth"}    
        ]
    }
request = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_custom_bucket),headers=api_headers)
JSON(request.json(), indent=2)

### CARD-BS-PRIVATE - Sentinel-1 Backscatter

Make an order to private storage as in the previous example

In [None]:
# STAC INPUT AVAILABLE : OK

processor_name = "card_bs_private"
identifier = "S1A_IW_GRDH_1SDV_20230910T054256_20230910T054321_050261_060CD9_BF21"

order_body_custom_bucket = {
        "Name": "DEDL - Hook presentation - custom bucket - " + processor_name + " - " + test_run_id,
        "WorkflowName": processor_name,
        "IdentifierList": [identifier],
        "WorkflowOptions":[
            {"Name":"s3_bucket", "Value": output_bucket},
            {"Name":"s3_access_key", "Value": output_storage_access_key},
            {"Name":"s3_secret_key", "Value": output_storage_secret_key},
            {"Name":"s3_prefix", "Value": output_prefix},
            {"Name":"s3_endpoint_url", "Value": output_storage_url},
            {"Name": "input_catalogue_type", "Value": "STAC"},
            {"Name": "input_catalogue_url", "Value": "https://hda.data.destination-earth.eu/stac"},
            {"Name": "input_catalogue_collection", "Value": "EO.ESA.DAT.SENTINEL-1.L1_GRD"},
            {"Name": "source_client_id", "Value": "hda-public"},
            {"Name": "source_client_secret", "Value": ""},
            {"Name": "source_username", "Value": username},
            {"Name": "source_password", "Value": password},
            {"Name": "source_realm", "Value": "dedl"},
            {"Name": "source_server_url", "Value": "https://identity.data.destination-earth.eu/auth"}            
        ]
    }

request = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_custom_bucket),headers=api_headers)
JSON(request.json(), indent=2)


And another order to temporary storage, indicated by setting output_storage parameter to TEMPORARY   
```{"Name": "output_storage", "Value": "TEMPORARY"}```   
The result of processing will be stored in shared storage and download link provided in the output product details

In [None]:
# STAC INPUT AVAILABLE : OK

processor_name = "card_bs_private"
identifier = "S1A_IW_GRDH_1SDV_20230908T170841_20230908T170906_050239_060C22_92AB"
order_body_custom_bucket = {
        "Name": "DEDL - Hook presentation - temp - " + processor_name + " - " + test_run_id,
        "WorkflowName": processor_name,
        "IdentifierList": [identifier],
        "WorkflowOptions":[
            {"Name": "output_storage", "Value": "TEMPORARY"},
            {"Name": "input_catalogue_type", "Value": "STAC"},
            {"Name": "input_catalogue_url", "Value": "https://hda.data.destination-earth.eu/stac"},
            {"Name": "input_catalogue_collection", "Value": "EO.ESA.DAT.SENTINEL-1.L1_GRD"},
            {"Name": "source_client_id", "Value": "hda-public"},
            {"Name": "source_client_secret", "Value": ""},
            {"Name": "source_username", "Value": username},
            {"Name": "source_password", "Value": password},
            {"Name": "source_realm", "Value": "dedl"},
            {"Name": "source_server_url", "Value": "https://identity.data.destination-earth.eu/auth"}    
        ]
    }
request = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_custom_bucket),headers=api_headers)
JSON(request.json(), indent=2)

### CARD-COHINF - Sentinel-1 Coherence/Interferometry

In [None]:
# NO STAC INPUT AVAILABLE

processor_name = "card_cohinf"
identifier = "S1A_IW_SLC__1SDV_20230930T172417_20230930T172444_050560_06170F_59DF"
order_body_custom_bucket = {
        "Name": "DEDL - Hook presentation - temp - " + processor_name + " - " + test_run_id,
        "WorkflowName": processor_name,
        "IdentifierList": [identifier],
        "WorkflowOptions":[
            {"Name": "output_storage", "Value": "TEMPORARY"},
            {"Name": "card_producttype", "Value": "CARD_INF"},
            {"Name": "timespan", "Value": "24"},
            {"Name": "input_s3_url", "Value": input_storage_url},
            {"Name": "input_s3_secret_key", "Value": input_storage_secret_key},
            {"Name": "input_s3_access_key", "Value": input_storage_access_key},
            {"Name": "input_catalogue_url", "Value": input_catalogue}
        ]
    }
request = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_custom_bucket),headers=api_headers)
JSON(request.json(), indent=2)

### C2RCC - Case 2 Regional Coast Colour

In [None]:
# STAC INPUT AVAILABLE : FAILED

processor_name = "c2rcc"
identifier = "S2B_MSIL1C_20231001T102739_N0509_R108_T31TGL_20231001T123227"
order_body_custom_bucket = {
        "Name": "DEDL - Hook presentation - temp - " + processor_name + " - " + test_run_id,
        "WorkflowName": processor_name,
        "IdentifierList": [identifier],
        "WorkflowOptions":[
            {"Name": "output_storage", "Value": "TEMPORARY"},
            {"Name": "input_catalogue_type", "Value": "STAC"},
            {"Name": "input_catalogue_url", "Value": "https://hda.data.destination-earth.eu/stac"},
            {"Name": "input_catalogue_collection", "Value": "EO.ESA.DAT.SENTINEL-2.MSI.L1C"},
            {"Name": "source_client_id", "Value": "hda-public"},
            {"Name": "source_client_secret", "Value": ""},
            {"Name": "source_username", "Value": username},
            {"Name": "source_password", "Value": password},
            {"Name": "source_realm", "Value": "dedl"},
            {"Name": "source_server_url", "Value": "https://identity.data.destination-earth.eu/auth"}           
        ]
    }
request = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_custom_bucket),headers=api_headers)
JSON(request.json(), indent=2)

### sen2cor - Sen2Cor Sentinel-2 Bottom of Atmosphere optical correction

In [None]:
# STAC INPUT AVAILABLE : OK

processor_name = "sen2cor_private"
identifier = "S2B_MSIL1C_20231001T102739_N0509_R108_T32TMS_20231001T123227"
order_body_custom_bucket = {
        "Name": "DEDL - Hook presentation - temp - " + processor_name + " - " + test_run_id,
        "WorkflowName": processor_name,
        "IdentifierList": [identifier],
        "WorkflowOptions":[
            {"Name": "output_storage", "Value": "TEMPORARY"},
            {"Name": "input_catalogue_type", "Value": "STAC"},
            {"Name": "input_catalogue_url", "Value": "https://hda.data.destination-earth.eu/stac"},
            {"Name": "input_catalogue_collection", "Value": "EO.ESA.DAT.SENTINEL-2.MSI.L1C"},
            {"Name": "source_client_id", "Value": "hda-public"},
            {"Name": "source_client_secret", "Value": ""},
            {"Name": "source_username", "Value": username},
            {"Name": "source_password", "Value": password},
            {"Name": "source_realm", "Value": "dedl"},
            {"Name": "source_server_url", "Value": "https://identity.data.destination-earth.eu/auth"}             
        ]
    }
request = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_custom_bucket),headers=api_headers)
JSON(request.json(), indent=2)

### maja_private - MAJA Sentinel-2  Bottom of Atmosphere optical correction

MAJA (https://gitlab.orfeo-toolbox.org/maja/maja) is a multistep processor which benefits from processing long timeseries of Sentinel-2 scenes.  
To create a high quality output at least 5 observations of the same tile, preceding the time of interest should be processed.  
In the example we request production of a 18 days long timeseries ending with scene S2B_MSIL1C_20230710T001609_N0509_R059_T59WPP_20230710T002600

In [None]:
# NO STAC INPUT AVAILABLE

processor_name = "maja_private"
identifier = ""
order_body_custom_bucket = {
        "Name": "DEDL - Hook presentation - custom storage - " + processor_name + " - " + test_run_id,
        "WorkflowName": processor_name,
        "IdentifierList": [identifier],
        "WorkflowOptions":[
            {"Name": "output_storage", "Value": "PRIVATE"},
            {"Name":"s3_bucket", "Value": output_bucket},
            {"Name":"s3_access_key", "Value": output_storage_access_key},
            {"Name":"s3_secret_key", "Value": output_storage_secret_key},
            {"Name":"s3_prefix", "Value": output_prefix},
            {"Name":"s3_endpoint_url", "Value": output_storage_url},
            {"Name": "input_catalogue_url", "Value": input_catalogue},
            {"Name": "timeseries_end_id", "Value": "S2A_MSIL2A_20230601T104021_N0509_R008_T31UFT_20230601T215503"},
            {"Name": "timespan", "Value": "18"},
            {"Name": "l1c_input_s3_endpoint", "Value": input_storage_url},
            {"Name": "l1c_input_s3_access_key", "Value": input_storage_access_key},
            {"Name": "l1c_input_s3_secret_key", "Value": input_storage_secret_key}
        ]
    }
request = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_custom_bucket),headers=api_headers)
JSON(request.json(), indent=2)

### dedl_hello_world - User defined workflow

This custom processor demonstrates the integration of a new workflow into the Hook service.
Processor was delivered as a docker container and integrated with Hook standard input/output functions.
The resulting workflow retrieves file from input s3 storage, converts all characters in the file to uppercase and stores the result in the output s3 storage.

In [None]:
# the test input file (example.data) was put in a private object storage, in separate bucket "odp-input"
# the same credentials are used both for input and output buckets
processor_name = "dedl_hello_world"
identifier = "noidentifier"
order_body_custom_bucket = {
        "Name": "DEDL - Hook presentation - custom storage - " + processor_name + " - " + test_run_id,
        "WorkflowName": processor_name,
        "IdentifierList": [identifier],
        "WorkflowOptions":[
            {"Name": "output_storage", "Value": "PRIVATE"},
            {"Name":"s3_bucket", "Value": output_bucket},
            {"Name":"s3_access_key", "Value": output_storage_access_key},
            {"Name":"s3_secret_key", "Value": output_storage_secret_key},
            {"Name":"s3_prefix", "Value": output_prefix},
            {"Name":"s3_endpoint_url", "Value": output_storage_url},
            {"Name":"input_s3_endpoint_url", "Value": output_storage_url},
            {"Name":"input_s3_path", "Value": "s3://odp-input/"},
            {"Name":"input_s3_access_key", "Value": output_storage_access_key},
            {"Name":"input_s3_secret_key", "Value": output_storage_secret_key}
        ]
    }
request = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_custom_bucket),headers=api_headers)
JSON(request.json(), indent=2)

We can make sure everything went as expected we can check the status of our orders

In [None]:
JSON(requests.get(service_root_url + "ProductionOrders?$filter=(contains(Name,'" + test_run_id + "'))", headers=api_headers).json(), indent=2)

# High priority order   
If we want to order something with higher priority we repeat this process, this time adding priority to the call body, this time we only order one product   
We will use test workflow to simulate shorter request   
First let's check what parameters are accepted by this processor

In [None]:
processor_name = "odp-test"
result = requests.get(service_root_url
                                  + "Workflows?$expand=WorkflowOptions&$filter=(Name eq '"+processor_name+"')",
                                  headers=api_headers).json()
workflow_details = json.dumps(result, indent=2)
JSON(result, indent=2)

And now issuse an order with priority 5 which will run for 120 seconds

In [None]:
order_body_increased_priority = {
        "Name": "DEDL - Hook presentation - increased priority - first",
        "WorkflowName": processor_name,
        "IdentifierList":[],
        "Priority": 5,
        "WorkflowOptions":[{
        "Name": "sleep_time",
        "Value": 120}]
    }

result = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_increased_priority),headers=api_headers)

JSON(result, indent=2)

In [None]:
order_body_increased_priority = {
        "Name": "DEDL - Hook presentation - increased priority - second",
        "WorkflowName": processor_name,
        "IdentifierList":[],
        "Priority": 10,
        "WorkflowOptions":[{
        "Name": "sleep_time",
        "Value": 120}]
    }

result = requests.post(service_root_url+"BatchOrder/OData.CSC.Order",
                            json.dumps(order_body_increased_priority),headers=api_headers)

JSON(result, indent=2)

Once again let's make sure everything worked

In [None]:
result = requests.get(service_root_url + "ProductionOrders?$filter=(endswith(Name,'" + test_run_id + "'))", headers=api_headers).json()
JSON(result, indent=2)

### Listing the contents of the output storage

In [None]:
import boto3

s3 = boto3.client('s3',aws_access_key_id=output_storage_access_key, aws_secret_access_key=output_storage_secret_key, endpoint_url=output_storage_url,)

paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=output_bucket, Prefix='dedl_060/')

for page in pages:
    try:
        for obj in page['Contents']:
            print(obj['Key'])
    except KeyError:
        print("No files exist")
        exit(1)

### Download the results from temporary storage

In [None]:
# replace XXXX with order id of a completed order from the current test run
# list order items within a production order

JSON(requests.get('https://odp.data.destination-earth.eu/odata/v1/BatchOrder(XXXX)/Products', headers=api_headers).json(), indent=2)

In [None]:
# replace XXXX with order id of a completed order from the current test run and order item id YYYY from the item listing
# download output product
# result is stored in output.zip and number of transferred bytes is printed

url = 'https://odp.data.destination-earth.eu/odata/v1/BatchOrder(XXXX)/Product(YYYY)/$value'
r = requests.get(url, headers=api_headers, allow_redirects=True)

open('output.zip', 'wb').write(r.content)

### User defined workflow

Users can 
In scope of the validation a processor 