2. Complete Example

Here is the source code of the examples/ex2_complete.py you ran in Getting Started.

examples/ex2.py

import argparse
import subprocess
import sys

from cosmos.api import (
    Cosmos,
    Dependency,
    draw_stage_graph,
    draw_task_graph,
    pygraphviz_available,
    py_call,
)


def echo(word, out_txt):
    with open(out_txt, "w") as fp:
        fp.write(word)


def cat(in_txts, out_txt):
    subprocess.run(f"cat {' '.join(map(str, in_txts))} > {out_txt}", shell=True, check=True)


def word_count(in_txts, out_txt, chars=False):
    c = " -c" if chars else ""
    input = " ".join(map(str, in_txts))

    subprocess.run(f"wc{c} {input} > {out_txt}", shell=True, check=True)


def recipe(workflow):
    # Create two Tasks that echo "hello" and "world" respectively (source nodes of the dag).
    echo_tasks = [
        workflow.add_task(func=echo, params=dict(word=word, out_txt=f"{word}.txt"), uid=word, mem_req=10,)
        for word in ["hello", "world"]
    ]

    # Split each echo into two dependent Tasks (a one2many relationship).
    word_count_tasks = []
    for echo_task in echo_tasks:
        word = echo_task.params["word"]
        for n in [1, 2]:
            cat_task = workflow.add_task(
                func=cat,
                params=dict(in_txts=[echo_task.params["out_txt"]], out_txt=f"{word}/{n}/cat.txt",),
                parents=[echo_task],
                mem_req=10,
                uid=f"{word}_{n}",
            )

            # Count the words in the previous stage.  An example of a simple one2one relationship.
            # For each task in StageA, there is a single dependent task in StageB.
            word_count_task = workflow.add_task(
                func=word_count,
                # Dependency instances allow you to specify an input and parent simultaneously.
                params=dict(
                    in_txts=[Dependency(cat_task, "out_txt")], out_txt=f"{word}/{n}/wc.txt", chars=True,
                ),
                mem_req=10,
                # parents=[cat_task], <-- not necessary!
                uid=f"{word}_{n}",
            )
            word_count_tasks.append(word_count_task)

    # Cat the contents of all word_counts into one file.  Only one node is being created who's
    # parents are all of the WordCounts (a many2one relationship, aka a reduce operation).
    summarize_task = workflow.add_task(
        func=cat,
        params=dict(in_txts=[Dependency(t, "out_txt") for t in word_count_tasks], out_txt="summary.txt",),
        parents=word_count_tasks,
        stage_name="Summary_Analysis",
        mem_req=10,
        uid="",
    )  # It's the only Task in this Stage, so doesn't need a specific uid


def main():
    p = argparse.ArgumentParser()
    p.add_argument("-drm", default="local", help="", choices=("local", "awsbatch", "slurm", "drmaa:ge", "ge"))
    p.add_argument("-q", "--queue", help="Submit to this queue if the DRM supports it")

    args = p.parse_args()

    cosmos = Cosmos("cosmos.sqlite", default_drm=args.drm, default_max_attempts=2, default_queue=args.queue)
    cosmos.initdb()

    workflow = cosmos.start("Example2", skip_confirm=True)

    recipe(workflow)

    # any parameters that start with out_ are output directories, and will be created if
    # the user calls workflow.make_output_dirs
    workflow.make_output_dirs()
    workflow.run(max_cores=10, cmd_wrapper=py_call)

    # Noting here that if you wanted to look at the outputs of any Tasks to decide how to generate the rest of a DAG
    # you can do so here, proceed to add more tasks via workflow.add_task(), and then call workflow.run() again.
    # Yes, it does require running all Tasks in the dag to get the outputs of any Task, and we hope to address
    # that limitation at some point in the future.

    if pygraphviz_available:
        # These images can also be seen on the fly in the web-interface
        draw_stage_graph(workflow.stage_graph(), "/tmp/ex1_task_graph.png", format="png")
        draw_task_graph(workflow.task_graph(), "/tmp/ex1_stage_graph.png", format="png")
    else:
        print("Pygraphviz is not available :(")

    sys.exit(0 if workflow.successful else 1)


if __name__ == "__main__":
    main()

Here’s the DAG that was created. If you use the web-interface, the nodes are convenient links you can click to jump straight to the debug information that pertains to that Task or Stage.

../_images/ex1_stage_graph.png

The Stage Graph is a high level overview. Often the DAG of Tasks is too large for a visualization to be useful:

../_images/ex1_task_graph.png