Automation

Automation is a new, experimental feature that introduces a flexible and efficient way to automate tasks within the Onedata system. With the capability to define workflows, users can compose sequences of tasks and run them on selected data sets.

While Onedata takes care of orchestrating the execution of workflows, the implementation of individual data processing functions (lambdas) within a workflow remains the responsibility of the user. This logic defines how data should be manipulated, transformed, or analyzed. Automation does not impose restrictions on the nature of this logic, allowing users to implement their processing steps in a manner that best suits their requirements.

centered

Key Advantages

  1. Flexibility: Users can implement data processing logic in any way that aligns with their goals and requirements.

  2. Simplified Workflow Management: Onedata takes care of the workflow's execution flow, ensuring a smooth and efficient process without requiring users to delve into the details of execution orchestration.

  3. Scalability: With Onedata managing the workflow execution, users can benefit from the system's scalability when dealing with large data sets and complex processing tasks.

OpenFaaS

OpenFaaS serves as the engine for executing user-defined functions within the Onedata Automation framework. This open-source platform facilitates the deployment of functions in containers on a Kubernetes cluster, allowing for on-demand and scalable execution.

Onedata provides tools that abstract the complexities of OpenFaaS integration. Users define their data processing functions without interaction with OpenFaaS.

Deploying OpenFaaS

In the simplest setup, OpenFaaS service can be deployed within a single-node Kubernetes cluster. Our ansible playbook can be used to automate this task — it sets up a Kind cluster and deploys all the machinery.

  1. Clone onedata-deployments repo:

    git clone https://github.com/onedata/onedata-deployments.git
    
  2. Enter OpenFaaS deployment directory:

    cd onedata-deployments/openfaas/ansible/
    
  3. Familiarize yourself with the README.md, though all the required steps are covered in this presentation.

  4. Make sure all requirements are installed:

    sudo apt install -y python3 python3-pip
    sudo python3 -m pip install ansible "Jinja2>=2.10,<3.1" jmespath kubernetes
    
  1. Edit group_vars/all.yml file:

    vi group_vars/all.yml
    
    # Oneprovider hostname - should be accessible from the OpenFaaS host.
    oneprovider_hostname: ???.demo.onedata.org    # replace ??? with your Oneprovider subdomain label
    
    # IP address of the VM where OpenFaaS will be deployed.
    # Should be accessible from the Oneprovider host.
    openfaas_host: ???    # replace ??? with your vm ip - hint: hostname -i
    
    ...
    
    # Openfaas admin password
    openfaas_admin_password: ???
    
    ...
    
    # pod-status-monitor will use this secret to authorize status reports sent to
    # Oneprovider, can be arbitrary.
    openfaas_activity_feed_secret: ???
    
  1. Edit hosts file:

    vi hosts
    
    [openfaas]
      # By default the ansible script is run on the openfaas VM.
      # If this is not the case place the correct value for ansible_host.
      openfaas-vm ansible_host=???  # replace ??? with your vm ip - hint: hostname -i
    [oneprovider]
      # The host where the Oneprovider service is already running
      # (ansible will adjust its configuration and restart it).
      oneprovider-vm ansible_host=???  # replace ??? with your vm ip - hint: hostname -i
    
  2. Make sure all nodes are reachable with SSH from the ansible host and the public key (of the user running the ansible) is added.

    When running for a localhost node, one of the ways is to do this:

    [ ! -f ~/.ssh/id_rsa ] && ssh-keygen -b 2048 -t rsa -f ~/.ssh/id_rsa -q -N ""  # will be done if not generated yet
    cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
    

    or to use ssh-agent.

  1. Run the playbook:

    ansible-playbook -i hosts site.yml
    

    There should be no errors — let us know if there are.

Caution

As Automation is an experimental feature, be aware of its evolving nature. Changes and updates may occur regularly, without backward compatibility guarantees, and we encourage you to provide feedback on your experiences.

Automation lambda

Lambda is a workflow element that can be understood as a function. It has its internal logic, arguments, and results. Among the solutions available on the market, it can be compared to popular AWS Lambda functions.

The primary purpose of lambdas used in the Onedata system is, of course, to work with files. However, you can also construct lambdas working on more abstract data, utilizing remote services, etc.

Main features of a lambda:

  • is a processing logic based on a Docker image exposing a generic API,
  • processes received arguments, producing specific results,
  • allows accessing files in the space through Oneclient,
  • is capable of making requests to the Oneprovider REST API,
  • receives arguments not individually, but in batches (similarly, results are also returned in batches),
  • can generate logs and statistics, and throw exceptions for monitoring and error-handling purposes.

Lambda arguments

  • have a name,
  • are strictly typed,
  • can be marked as optional,
  • can have default values.

Lambda logic receives a collection (batch) of objects for processing, where the argument names serve as keys.

Lambda results

  • like arguments, have a name and are strictly typed,
  • can be obtained directly from the Docker image output or from a local file in the container,
  • all declared results are required.

Lambda logic must return a collection (batch) of objects, where the result names serve as keys.

Lambda parameters

  • used to control custom lambda settings (like timing-out policy, used algorithm, etc.),
  • very similar to arguments — they have names, are strictly typed, can be optional, with a default value,
  • arguments are dynamic — calculated during workflow execution, for every processed item; parameters, on the other hand, are always constant,
  • using parameters is entirely optional.

Lambda logic receives a single parameters object, where the keys are the parameter names.

Data types in automation

All data processed in workflows (e.g. lambda arguments) are typed. Supported types include:

  • number — 1, -0.5,
  • boolean — true, false,
  • string — "joe",
  • object — { a: 1 },
  • file — { "file_id": "00000234234...", ... },
  • dataset — { "datasetId": "7ad9ef096c9e...", ... },,
  • range — { "start": 0, "end": 10, "step": 1 },
  • time series measurement — { "tsName": "processedBytes", "timestamp": 1701193872, "value": 1024 },
  • array — [10, 3, 6].

Automation workflow

A workflow is a collection of tasks with the common goal of processing certain input data to achieve a specified outcome. The components of a workflow include:

  • stores - data structures that contain input data, intermediate results, and the final outcomes of workflow tasks,
  • tasks - lambdas along with their customized parameters, arguments assignment, and result consumption rules,
  • parallel boxes - groups of tasks that can be executed simultaneously.
  • lanes - groups of parallel boxes. Iterate over specified stores and execute tasks for each of the store values.

Workflow schema and execution

Workflow schema - a specification (template) of a workflow, operating on generic concepts.

Workflow execution - instantiation of a schema; execution of the specified tasks for a concrete set of input data.

Workflow schema example

screenshot centered

Automation inventory

Automation inventory groups workflow schemas and their associated lambdas. It specifies who can edit and view schemas and lambdas using the resource membership model known from other Onedata resources.

Lambdas contained in the inventory can be reused across different workflows.

centered

Automation inventory management privileges

 

Privilege Actions
View inventory view inventory contents
Modify inventory edit inventory details
Remove inventory remove inventory
Manage lambdas create, edit and remove lambdas
Manage workflow schemas create, edit and remove workflow schemas
... manage inventory's memberships in other resources

Our first workflow

Now, you are going to define your first workflow schema. It will calculate the MD5 checksum of specified files and save the result as their metadata. To achieve this, you will need to do the following 4 steps:

  1. Create your inventory.
  2. Create a lambda Docker image.
  3. Define a lambda in the inventory, based on the Docker image.
  4. Define a workflow that uses the lambda.

Our first workflow — creating a new automation inventory

  1. Click on the Automation item in the main menu, then in the top right corner of the sidebar, click on the + icon.
  2. Fill the input at the center of the page with a name of your choice.
  3. Click on the Create button.

Our first workflow — creating a lambda Docker image

This stage requires basic knowledge about Python and Docker, and assumes that you have access to a public Docker repository where you can push Docker images. If that's not the case, you may skip this step and go to the Define the lambda in the inventory, using the below Docker image:

onedata/training-lambda-calc-checksum:v1

The purpose of the lambda function is to calculate the checksum of file content (pointed by file argument) and store it in the file's extended attributes (xattrs).

The algorithm is configurable; it can be passed using the algorithm lambda parameter. Any algorithm supported by Python's hashlib package can be used (e.g. MD5, SHA-1, SHA-256, and so on).

Apart from setting the xattr, each processed item generates results:

  • result summary — result,
  • log entry — statusLog.

During execution, the lambda will also continuously stream time series statistics via the stats result file.

Our first workflow — creating a lambda Docker image

  1. Create a new directory on your local disk and perform the steps that follow inside it.
  2. Create a file called handler.py which will contain the lambda code.
  3. Open the file in a text/code editor of your choice.

Our first workflow — creating a lambda Docker image

  1. Start with creating a Python file header and importing the required libraries:
    #!/usr/bin/env python
    import hashlib
    
    import json
    import os
    import time
    import zlib
    
    import xattr
    

Our first workflow — creating a lambda Docker image

  1. Create the main function called handle.
    def handle(job_batch_request, heartbeat_callback):
        algorithm = job_batch_request["ctx"]["config"]["algorithm"]
    
        results = []
        for job_args in job_batch_request["argsBatch"]:
            results.append(run_job(job_args, algorithm))
            # should be called regularly to report that the lambda is still
            # alive and processing tasks - especially if a job can take a long time
            heartbeat_callback()
    
        return {"resultsBatch": results}
    

Our first workflow — creating a lambda Docker image

  1. Create the job runner function that opens the input file, calculates its checksum, and stores it in an xattr:
    def run_job(args, algorithm):
        file_id = args["file"]["file_id"]
        # special pseudo-path in Oneclient mount that allows accessing
        # files by their IDs rather than paths
        file_path = f"/mnt/onedata/.__onedata__file_id__{file_id}"
        metadata_key = algorithm + "_checksum"
    
        if not os.path.isfile(file_path):
            return {
                "result": {
                    "file_id": file_id,
                    "checksum": None
                },
                "statusLog": {
                    "fileName": args["file"]["name"],
                    "status": "Skipped - not a regular file",
                },
            }
    
        with open(file_path, "rb") as fd:
            checksum = calculate_checksum(fd, algorithm)
            xd = xattr.xattr(file_path)
            xd.set(metadata_key, str.encode(checksum))
    
            return {
                "result": {
                    "file_id": file_id,
                    "checksum": checksum,
                    "algorithm": algorithm,
                },
                "statusLog": {
                    "fileName": args["file"]["name"],
                    "status": "Processed",
                },
            }
    

Our first workflow — creating a lambda Docker image

  1. Create the body of the calculate_checksum function:
    def calculate_checksum(fd, algorithm):
        checksum = getattr(hashlib, algorithm)()
        while True:
            data = fd.read(262144)
            if not data:
                break
            checksum.update(data)
            stream_measurement("bytesProcessed", len(data))
    
        stream_measurement("filesProcessed", 1)
        return checksum.hexdigest()
    

Our first workflow — creating a lambda Docker image

  1. Create the stream_measurement function, used for live streaming time series measurements via a file (local for the lambda).
    def stream_measurement(ts_name, value):
        measurement = {
            "tsName": ts_name,
            "timestamp": int(time.time()),
            "value": value,
        }
    
        with open(f"/out/stats", "a+") as f:
            json.dump(measurement, f)
            f.write("\n")
    

Our first workflow — creating a lambda Docker image

  1. Create a file called requirements.txt in the same directory, with the below content. These libraries are required by the handler.py implementation:
    cffi==1.15.1
    xattr==0.9.9
    

Our first workflow — creating a lambda Docker image

  1. Create a file called Dockerfile in the same directory, with the below content. It is a one-liner thanks to the fact that the base image implicitly references the handler.py and requirements.txt files, expecting them to be in the same directory.
    FROM onedata/lambda-base-slim:v1
    

Our first workflow — creating a lambda Docker image

  1. Build the Docker image and publish it in a publicly available repository, e.g. Dockerhub (this requires an account there).
    • choose a meaningful tag, e.g.: username/lambda-calc-checksum:v1, and adjust the below commands,
    • build the Docker image with:
      docker build . -t username/lambda-calc-checksum:v1
      docker push username/lambda-calc-checksum:v1
      

Our first workflow — define a lambda in the inventory

  1. Go to your inventory and enter Lambdas view.
  2. Click the Add new lambda button.
  3. Fill the first few fields with the following values:
    • Name: calculate-checksum,
    • Docker image: the name of your Docker image, that you have built and published,
    • Read only: no
    • Mount space: yes

Our first workflow — define a lambda in the inventory

  1. In the Configuration parameters section add one parameter with the following properties:
    • Name: algorithm
    • Type: String, with allowed strings: md5, sha1, sha256.

screenshot centered

Our first workflow — define a lambda in the inventory

  1. In the Arguments section add one argument named file of type File. Adjust Carried file attributes of the File type to include file_id and name. It's required as we use both of these attributes in our lambda logic.

screenshot centered

Our first workflow — define a lambda in the inventory

  1. In the Results section, add these three results:
    • result — of type Object,
    • statusLog — of type Object,
    • stats — of type Time series measurements, with the Via file toggle checked.

screenshot centered

Our first workflow — define a lambda in the inventory

  1. Define the measurements that will be streamed during the lambda execution. Click on Defined measurements and configure it as below. The value in the name matcher should be the same as the name of the measurements returned by your handler.

screenshot centered

Our first workflow — define a lambda in the inventory

  1. Click the Create button at the end of the form to finish lambda creation.

Our first workflow — define a workflow that uses our lambda

  1. Go to the Workflows view of your inventory and click the Add new workflow button.
  2. Enter calculate checksums into the Name field and click the Create button.
  3. Click the Add store button at the bottom of the editor.

Our first workflow — define a workflow that uses our lambda

  1. Fill in the form inputs with the following parameters:

    • Name: input-files,
    • Type: Tree forest,
    • Data type: File,
    • Default value: Leave blank,
    • Needs user input: yes,

    and click the Create button.

Our first workflow — define a workflow that uses our lambda

screenshot centered

Our first workflow — define a workflow that uses our lambda

  1. Repeat the previous step to create a second store with the following parameters:
    • Name: results,
    • Type: List,
    • Data type: Object,
    • Default value: Leave blank,
    • Needs user input: no.

Our first workflow — define a workflow that uses our lambda

  1. Create a new lane by clicking on the + button in the middle of the editor. Use the following parameters:
    • Name: calculate checksum,
    • Max retries: 3
    • Source store: input-files.
  2. Inside the newly created lane, create a parallel box and a task inside it.
  3. Click the Add to workflow button on the previously created lambda.

Our first workflow — define a workflow that uses our lambda

  1. In the Configuration parameters section, choose the md5 value for the algorithm parameter.
  2. In the Arguments section, choose Iterated item as the value builder for the file argument.
  3. In the Results section, add lambda result mappings to the following stores:
    • result to the results store, with Append action,
    • statusLog to the Current task system audit log store, with Append action,
    • stats to the Current task time series store store, ignore the dispatch rules for now.

Our first workflow — define a workflow that uses our lambda

  1. Check the Create time series store toggle in the Time Series Store section, and add a schema:
    • Name generator type: Exact,
    • Name generator: bytesProcessed — matching your Python variables,
    • Unit: Bytes,
    • Metrics: sum5s, sum1m, sum1h.
  2. Add a second schema:
    • Name generator type: Exact,
    • Name generator: filesProcessed,
    • Unit: None,
    • Metrics: sum5s, sum1m, sum1h.
  3. In the Charts dashboard spec input, paste in the JSON with the definition taken from https://pastebin.com/skb5zqkF. The work on a visual chart editor is still in progress for the time being.

Our first workflow — define a workflow that uses our lambda

The whole Time series store section should look like this:

screenshot centered

Our first workflow — define a workflow that uses our lambda

  1. Go back to the Results section and complete the Dispatch rules of the stats result, mapping the measurements produced by the lambda to the time series in the target store.

screenshot centered

Our first workflow — define a workflow that uses our lambda

  1. Click the Create button at the bottom to conclude task creation.
  2. Click the Save at the top right to save the whole workflow schema.

Workflow execution scheduling

Workflow execution scheduling is constrained by a few rules:

  1. Every provider can do the execution as long as it is connected to a working OpenFaaS service.
  2. Workflows are executed in the context of a particular space. To schedule a workflow, you need to have both View workflow executions and Schedule workflow executions privileges in that space.
  3. Workflow executions are local to each provider — information about scheduled executions is not propagated among providers.
  4. You can schedule executions of workflows from inventories where you have a membership.
  5. All operations performed by the workflow tasks will be limited by your privileges in the space and file access permissions.

Exercise

Executing our workflow

  1. Open the file browser of the space alpha-11p and go to your directory.
  2. Right-click on the FirstDataset directory and choose Run workflow.
  3. Choose the workflow created in the previous steps.

    Note that the directory will be already filled in as the value for the input store.

  4. Click the Run workflow button.

Exercise

Executing our workflow

  1. As the workflow is running, you can click on the task and examine the three links:
    • Time series measurements — here you can see the charts produced based on the measurements that are streamed as the lambda is running,
    • Pods activity — captures events concerning Kubernetes pods that run the lambda functions,
    • Audit log — here you can view your custom logs produced by the lambda. In the upper-right corner, you can also view the global Audit Log, with logs regarding the whole workflow.

Exercise

Executing our workflow

  1. Go back to the file browser and open the directory you used in the workflow. Then, examine the metadata that has been added to each file inside (use the Metadata context menu or the Meta tag).

Question

What if we would like to introduce some breaking changes into our lambda? Change the name of the argument, for example?

Such modifications can occur frequently during the lifetime of our automation inventory. Is there a way to avoid breaking workflows containing modified lambdas?

Lambda revisions

Lambda revision is a snapshot of all lambda properties - name, parameters, arguments, result, etc. By creating a new revision we preserve all older revisions untouched during the modification.

Tasks in workflows always point to a specific revision of a lambda, which protects them from breaking changes.

Warning: revision contains only the name of the Docker image, not the whole Docker image content. Hence, it is important to not override existing Docker tag with a newer image.

Lambda revisions

Each revision has a special State property which helps to choose which revision should be the preferred one during workflow development. There are three possible states:

  • Draft - unstable revision, usually under development,
  • Stable - revision preferred for typical usage,
  • Deprecated - obsolete revision, which should not be used anymore (due to bugs, low performance, etc.).

Once created, revision can't be modified except for the State property.

Workflow revisions

Workflows, like lambdas, also have revisions. They serve a similar purpose — to introduce versioning and avoid breaking already defined logic used by other inventory members.

Unlike lambda revision, workflow revision elements can be modified.

Exercise

Changing lambda argument name

We will rename the lambda file argument to item. The change will be introduced using revisions to not break existing workflows.

The below 3 steps assume that you have an access to a public Docker repository where you can push Docker images. If that's not the case, you may skip this step and go to step 4., using the below Docker image:

onedata/training-lambda-calc-checksum:v2
  1. Open a code editor with handler.py of your lambda.
  2. Change any args["file"] occurrences to args["item"] (there should be three of them).
  3. Save changes and build a new Docker image:
    docker build . -t username/lambda-calc-checksum:v2
    docker push username/lambda-calc-checksum:v2
    
    Remember to replace the image name with the one you used previously. Also, change the tag to v2!

Exercise

Changing lambda argument name

  1. Go to the Lambdas view of your inventory.
  2. Click the + Create new revision link in your lambda. A prefilled lambda editor should appear.
  3. Change the Docker image value to the name of the just-created image.
  4. Change the name of the file argument to item.
  5. Apply changes — click the Create button.

We now have a 2nd revision of our lambda, which is ready to be used in workflows. The next step is to use it in our calculate checksums workflow.

Exercise

Changing the lambda revision used by the workflow task

That exercise can be done on the existing workflow revision, but we will create a new one to keep the older version untouched.

  1. Go to the Workflows view of your inventory.
  2. Click the + Create new revision link in your workflow.
  3. Click on the small gear icon under the task name and choose the Modify action. The task editor should appear.
  4. Change the Revision value to 2. Note that in the Arguments section, file argument has been automatically changed to item.

Exercise

Changing the lambda revision used by the workflow task

  1. Accept changes — click the Modify button at the end of the form and then save the workflow (button in the top right corner).
  2. Optionally, you can switch to the Details tab and change State to Stable to inform other members of the inventory that this revision is now the recommended one. This modification also requires saving.

Now we have two independent versions of the same workflow, both using different lambda revisions. And — most importantly — there was not a single moment when the existing workflow revision was broken. We handled a breaking upgrade smoothly.

Actions

Supported actions on workflow execution are:

  1. pause - suspends execution without interrupting ongoing jobs (waits until they are completed).

  2. resume - resumes suspended execution.

  3. cancel - immediately terminates workflow execution.

  4. retry - re-executes the workflow starting from the chosen lane for items that failed in the lane.

  5. rerun - re-executes the workflow starting from the chosen lane for all items in the lane.

Example of test workflow - echo

The echo workflow simply returns its input as output. However, its behavior can be slightly modified with configuration parameters:

  1. sleepDurationSec - number of seconds to sleep before returning the result for each job.
  2. exceptionProbability - the probability [0..1] of returning an exception rather than the expected result.

Workflow execution actions

  1. Download the echo workflow dump from our GitHub repository
  2. Go to the Workflows view of your inventory.
  3. Click the Upload button, choose JSON file from the first step, and click the Apply button.
  4. Click on the three dots icon beside the lane name and choose the Modify action. The MODIFY LANE modal should appear.
  5. Change the value of the Instant failure exception threshold parameter to 1 and Max. batch size iterator option to 1.
  6. Apply changes — click the OK button.
  7. Click on the small gear icon under the task name and choose the Modify action. The task editor should appear.
  8. Change the value of the sleepDurationSec configuration parameter to 30.
  9. Change the value of the exceptionProbability configuration parameter to 0.3.

Exercise

Workflow execution actions

  1. Accept changes — click the Modify button at the end of the form and then save the workflow (button in the top right corner).
  2. Go to the file browser of space alpha-11p and enter your directory.
  3. Select at least 10 files of your choice, right-click on the selection, choose the Run workflow menu, and then select the echo workflow.
  4. Run the workflow.
  5. Click the Pause button on the right part of the ongoing workflow frame and await execution to be suspended.
  6. Click the Resume button on the right part of the suspended workflow frame and await execution to be resumed and completed.

Exercise

Workflow execution actions

Due to exception probability ~30% some of the items should fail. The exact count is visible in Failed task counter. This should result in the lane run failure and automatic retry of only the failed items.

screenshot centered

Exercise

Workflow execution actions

The second run of the first lane should process only the failed items.

screenshot centered

Exercise

Workflow execution actions

  1. Return to the first run of the first lane and click the three dots next to lane run status. The lane actions menu should appear. screenshot centered
  2. Select View failed items to see exactly which items have failed.
  3. Then experiment with Retry failed items or Rerun all items to see what will happen.

Example of advanced workflow — BagIt Uploader

BagIt Uploader is a workflow that allows processing files archived in the BagIt format and converting them to the Onedata archives. The main stages of the processing include:

  1. Validation of the archive.
  2. Unpacking data.
  3. Downloading any remote files according to the fetch.txt content.
  4. Registering metadata.
  5. Calculating checksums.
  6. Creating Onedata archive.

Example of advanced workflow — BagIt Uploader

screenshot centered

Exercise

Executing BagIt Uploader

  1. Download the BagIt Uploader workflow dump from our GitHub repository
  2. Go to the Workflows view of your inventory.
  3. Click the Upload button, choose JSON file from the first step, and click the Apply button.
  4. Go to the file browser of space alpha-11p and enter your directory.
  5. Download an example BagIt archive file and then upload it into Onedata space.
  6. Right-click on the uploaded file, choose the Run workflow menu, and then select the bagit-uploader workflow.

Exercise

Executing BagIt Uploader

  1. You need to define where the archive will be unpacked. Click the Select file... link in the destination-directory section and choose Select/upload file.
  2. In the directory selector enter your directory, create a new one named bagit-output, and then click on the Confirm selection button. Now your new directory should be visible as a destination-directory.
  3. Run the workflow and wait for it to finish.
  4. Inspect the charts view — available in the bottom right corner.
  5. Go back to the file browser and check what is inside the destination directory.

The end

You made it. Congratulations!

Thank you for taking time to get familiar with Onedata.

If you want more or have any questions, please drop us a message:

info@onedata.org