Cosmos Initialization¶
Cosmos is initialized by instantiating a cosmos.Cosmos
instance, which represents a connection to a SQL database. The Cosmos instance can then
be used to start a Workflow. If a Workflow already exists with the same name, it will be resumed and all failed Tasks will be deleted from the database.
from cosmos.api import Cosmos
cosmos = Cosmos(database_url='sqlite:///my_cosmos_db.sqlite')
cosmos.initdb() # creates the tables, if they already exist this does nothing
workflow = cosmos.start(name='My_Workflow')
You may have to install the python package required to use the driver you’d like. For example, if you are using
postgres+psycopg2:///
, make sure you pip install psycopg2
. See SQLAlchemy Engines
for more details.
Note
If is often very useful to maintain a one SQLite databsae per Workflow (especially for production environment), stored in the output directory of that Workflow. This way, Workflows are completely atomic and all their provenance is nicely packed in a single location.
cosmos.api.Cosmos¶
-
class
cosmos.api.
Cosmos
(database_url: str = 'sqlite:///:memory:', get_submit_args: callable = <function default_get_submit_args>, default_drm: str = 'local', default_drm_options: Optional[Dict] = None, default_queue: Optional[str] = None, default_time_req: Optional[int] = None, default_max_attempts: int = 1, flask_app=None, default_job_class: Optional[str] = None, default_environment_variables: Optional[Dict] = None)[source]¶ -
__init__
(database_url: str = 'sqlite:///:memory:', get_submit_args: callable = <function default_get_submit_args>, default_drm: str = 'local', default_drm_options: Optional[Dict] = None, default_queue: Optional[str] = None, default_time_req: Optional[int] = None, default_max_attempts: int = 1, flask_app=None, default_job_class: Optional[str] = None, default_environment_variables: Optional[Dict] = None)[source]¶ - Parameters
database_url – A sqlalchemy database url. ex: sqlite:///home/user/sqlite.db or mysql://user:pass@localhost/database_name or postgresql+psycopg2://user:pass@localhost/database_name
get_submit_args – a function that returns arguments to be passed to the job submitter, like resource requirements or the queue to submit to. See
cosmos.api.default_get_submit_args()
for detailsflask_app (flask.Flask) – A Flask application instance for the web interface. The default behavior is to create one.
efault_drm – The Default DRM to use (ex ‘local’, ‘lsf’, or ‘ge’)
default_drm_options – Default value for every Task.drm_options
default_queue – Default value for every Task.queue
default_time_req – Default value for every Task.time_req
default_environment_variables – Default value for every Task.environment_variables
-
start
(name, restart=False, skip_confirm=False, primary_log_path='workflow.log', fail_fast=False)[source]¶ Start, resume, or restart an workflow based on its name. If resuming, deletes failed tasks.
- Parameters
name (str) – A name for the workflow. Must be unique for this Cosmos session.
restart (bool) – If True and the workflow exists, delete it first.
skip_confirm (bool) – (If True, do not prompt the shell for input before deleting workflows or files.
primary_log_path (str) – The path of the primary log to write to. If None, does not write to a file. Log information is always printed to stderr.
fail_fast (bool) – If True, terminate the workflow the first time a Task fails.
default_max_attempts (int) – The default maximum number of times to attempt a Task.
Otherwise, run all Tasks except those downstream of a failure. :rtype Workflow: :returns: An Workflow instance.
-
-
cosmos.api.
default_get_submit_args
(task, parallel_env='orte')[source]¶ Default method for determining the extra arguments to pass to the DRM. For example, returning “-n 3” if task.drm == “lsf” would cause all jobs to be submitted with bsub -n 3.
- Parameters
task (cosmos.api.Task) – The Task being submitted.
- Return type