Cosmos Initialization

Cosmos is initialized by instantiating a cosmos.Cosmos instance, which represents a connection to a SQL database. The Cosmos instance can then be used to start a Workflow. If a Workflow already exists with the same name, it will be resumed and all failed Tasks will be deleted from the database.

from cosmos.api import Cosmos
cosmos = Cosmos(database_url='sqlite:///my_cosmos_db.sqlite')
cosmos.initdb() # creates the tables, if they already exist this does nothing
workflow = cosmos.start(name='My_Workflow')

You may have to install the python package required to use the driver you’d like. For example, if you are using postgres+psycopg2:///, make sure you pip install psycopg2. See SQLAlchemy Engines for more details.

Note

If is often very useful to maintain a one SQLite databsae per Workflow (especially for production environment), stored in the output directory of that Workflow. This way, Workflows are completely atomic and all their provenance is nicely packed in a single location.

cosmos.api.Cosmos

class cosmos.api.Cosmos(database_url: str = 'sqlite:///:memory:', get_submit_args: callable = <function default_get_submit_args>, default_drm: str = 'local', default_drm_options: Optional[Dict] = None, default_queue: Optional[str] = None, default_time_req: Optional[int] = None, default_max_attempts: int = 1, flask_app=None, default_job_class: Optional[str] = None, default_environment_variables: Optional[Dict] = None)[source]
__init__(database_url: str = 'sqlite:///:memory:', get_submit_args: callable = <function default_get_submit_args>, default_drm: str = 'local', default_drm_options: Optional[Dict] = None, default_queue: Optional[str] = None, default_time_req: Optional[int] = None, default_max_attempts: int = 1, flask_app=None, default_job_class: Optional[str] = None, default_environment_variables: Optional[Dict] = None)[source]
Parameters
  • database_url – A sqlalchemy database url. ex: sqlite:///home/user/sqlite.db or mysql://user:pass@localhost/database_name or postgresql+psycopg2://user:pass@localhost/database_name

  • get_submit_args – a function that returns arguments to be passed to the job submitter, like resource requirements or the queue to submit to. See cosmos.api.default_get_submit_args() for details

  • flask_app (flask.Flask) – A Flask application instance for the web interface. The default behavior is to create one.

  • efault_drm – The Default DRM to use (ex ‘local’, ‘lsf’, or ‘ge’)

  • default_drm_options – Default value for every Task.drm_options

  • default_queue – Default value for every Task.queue

  • default_time_req – Default value for every Task.time_req

  • default_environment_variables – Default value for every Task.environment_variables

start(name, restart=False, skip_confirm=False, primary_log_path='workflow.log', fail_fast=False)[source]

Start, resume, or restart an workflow based on its name. If resuming, deletes failed tasks.

Parameters
  • name (str) – A name for the workflow. Must be unique for this Cosmos session.

  • restart (bool) – If True and the workflow exists, delete it first.

  • skip_confirm (bool) – (If True, do not prompt the shell for input before deleting workflows or files.

  • primary_log_path (str) – The path of the primary log to write to. If None, does not write to a file. Log information is always printed to stderr.

  • fail_fast (bool) – If True, terminate the workflow the first time a Task fails.

  • default_max_attempts (int) – The default maximum number of times to attempt a Task.

Otherwise, run all Tasks except those downstream of a failure. :rtype Workflow: :returns: An Workflow instance.

initdb()[source]

Initialize the database via sql CREATE statements. If the tables already exists, nothing will happen.

resetdb()[source]

Resets (deletes then initializes) the database. This is not reversible!

runweb(host, port, debug=True)[source]

Starts the web dashboard :param str host: Host name to bind to. Default is local host, but commonly 0.0.0.0 to allow outside internet traffic. :param int port: Port to bind to.

cosmos.api.default_get_submit_args(task, parallel_env='orte')[source]

Default method for determining the extra arguments to pass to the DRM. For example, returning “-n 3” if task.drm == “lsf” would cause all jobs to be submitted with bsub -n 3.

Parameters

task (cosmos.api.Task) – The Task being submitted.

Return type

str