import os
import sys
# from concurrent import futures
from datetime import datetime
from typing import Optional, Dict
from flask import Flask
from cosmos import WorkflowStatus
from cosmos import __version__
from cosmos.db import Base
from cosmos.models import Task
from cosmos.util.args import get_last_cmd_executed
from cosmos.util.helpers import confirm
[docs]def default_get_submit_args(task, parallel_env="orte"):
"""
Default method for determining the extra arguments to pass to the DRM.
For example, returning `"-n 3" if` `task.drm == "lsf"` would cause all jobs
to be submitted with `bsub -n 3`.
:param cosmos.api.Task task: The Task being submitted.
:rtype: str
"""
default_job_priority = None
use_mem_req = False
jobname = "%s[%s]" % (task.stage.name, task.uid.replace("/", "_"))
if task.drm in ["lsf", "drmaa:lsf"]:
rusage = '-R "rusage[mem={mem}] ' if task.mem_req and use_mem_req else ""
time = " -W 0:{0}".format(task.time_req) if task.time_req else ""
return '-R "{rusage}span[hosts=1]" -n {task.core_req}{time}{queue} -J "{jobname}"'.format(**locals())
elif task.drm in ["ge", "drmaa:ge"]:
return '-cwd -pe {parallel_env} {core_req}{priority} -N "{jobname}"{job_class}{queue}'.format(
priority=" -p %s" % default_job_priority if default_job_priority else "",
job_class=" -jc %s" % task.job_class if task.job_class else "",
queue=" -q %s" % task.queue if task.queue else "",
jobname=jobname,
core_req=task.core_req,
parallel_env=parallel_env,
)
elif task.drm == "slurm":
return "-c {cores} {partition}{jobname_str}{mem_str}{time_str}".format(
mem_str=(" --mem %s" % task.mem_req) if task.mem_req is not None else "",
time_str=(" --time %s" % task.time_req) if task.time_req is not None else "",
partition=" -p %s" % task.queue if task.queue else "",
jobname_str=" -J %s" % jobname if jobname else "",
cores=task.core_req,
jobname=jobname,
)
else:
return None
[docs]class Cosmos(object):
[docs] def __init__(
self,
database_url: str = "sqlite:///:memory:",
get_submit_args: callable = default_get_submit_args,
default_drm: str = "local",
default_drm_options: Optional[Dict] = None,
default_queue: Optional[str] = None,
default_time_req: Optional[int] = None,
default_max_attempts: int = 1,
flask_app=None,
default_job_class: Optional[str] = None,
default_environment_variables: Optional[Dict] = None,
):
"""
:param database_url: A `sqlalchemy database url <http://docs.sqlalchemy.org/en/latest/core/engines.html>`_. ex: sqlite:///home/user/sqlite.db or
mysql://user:pass@localhost/database_name or postgresql+psycopg2://user:pass@localhost/database_name
:param get_submit_args: a function that returns arguments to be passed to the job submitter, like resource
requirements or the queue to submit to. See :func:`cosmos.api.default_get_submit_args` for details
:param flask.Flask flask_app: A Flask application instance for the web interface. The default behavior is to create one.
:param efault_drm: The Default DRM to use (ex 'local', 'lsf', or 'ge')
:param default_drm_options: Default value for every Task.drm_options
:param default_queue: Default value for every Task.queue
:param default_time_req: Default value for every Task.time_req
:param default_environment_variables: Default value for every Task.environment_variables
"""
default_drm_options = {} if default_drm_options is None else default_drm_options
# Avoid cyclical import dependencies
from cosmos.job.drm.DRM_Base import DRM
assert default_drm.split(":")[0] in DRM.get_drm_names(), (
"unsupported drm: %s" % default_drm.split(":")[0]
)
if ":" not in database_url:
database_url = f"sqlite:///{database_url}"
assert "://" in database_url, "Invalid database_url: %s" % database_url
# self.futures_executor = futures.ThreadPoolExecutor(10)
if flask_app:
self.flask_app = flask_app
else:
try:
self.flask_app = Flask(__name__)
self.flask_app.secret_key = os.urandom(24)
@self.flask_app.teardown_appcontext
def shutdown_session(exception=None):
self.session.remove()
# self.flask_app.config['SQLALCHEMY_DATABASE_URI'] = database_url
# self.flask_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
self.flask_app.jinja_env.globals["time_now"] = datetime.now()
# self.flask_app.config['SQLALCHEMY_ECHO'] = True
# from flask_sqlalchemy import SQLAlchemy
#
#
# self.sqla = SQLAlchemy(self.flask_app)
# self.session = self.sqla.session
except NotImplementedError:
self.flask_app = None
self.get_submit_args = get_submit_args
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine(database_url, convert_unicode=True)
self.session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = self.session.query_property()
self.default_drm = default_drm
self.default_drm_options = default_drm_options
self.default_job_class = default_job_class
self.default_queue = default_queue
self.default_max_attempts = default_max_attempts
self.default_time_req = default_time_req
self.default_environment_variables = default_environment_variables
# def configure_flask(self):
# setup flask views
# from cosmos.web.admin import add_cosmos_admin
# add_cosmos_admin(flask_app, self.session)
# @property
# def session(self):
# return self.Session()
def close(self):
self.session.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] def start(
self, name, restart=False, skip_confirm=False, primary_log_path="workflow.log", fail_fast=False,
):
"""
Start, resume, or restart an workflow based on its name. If resuming, deletes failed tasks.
:param str name: A name for the workflow. Must be unique for this Cosmos session.
:param bool restart: If True and the workflow exists, delete it first.
:param bool skip_confirm: (If True, do not prompt the shell for input before deleting workflows or files.
:param str primary_log_path: The path of the primary log to write to. If None, does not write to a file. Log information is always printed to stderr.
:param bool fail_fast: If True, terminate the workflow the first time a Task fails.
:param int default_max_attempts: The default maximum number of times to attempt a Task.
Otherwise, run all Tasks except those downstream of a failure.
:rtype Workflow:
:returns: An Workflow instance.
"""
from .Workflow import Workflow
assert os.path.exists(os.getcwd()), (
"The current working dir of this environment, %s, does not exist" % os.getcwd()
)
# output_dir = os.path.abspath(output_dir)
# output_dir = output_dir if output_dir[-1] != '/' else output_dir[0:] # remove trailing slash
# prefix_dir = os.path.split(output_dir)[0]
# assert os.path.exists(prefix_dir), '%s does not exist' % prefix_dir
from ..util.helpers import mkdir
# assert isinstance(primary_log_path, basestring) and len(primary_log_path) > 0, 'invalid parimary log path'
if primary_log_path is not None and os.path.dirname(primary_log_path):
mkdir(os.path.dirname(primary_log_path))
session = self.session
old_id = None
if restart:
wf = session.query(Workflow).filter_by(name=name).first()
if wf:
old_id = wf.id
msg = "Restarting %s. Are you sure you want to delete the all sql records?" % wf
if not skip_confirm and not confirm(msg):
raise SystemExit("Quitting")
wf.delete(delete_files=False)
else:
if not skip_confirm and not confirm(
"Workflow with name %s does not exist, "
"but `restart` is set to True. "
"Continue by starting a new Workflow?" % name
):
raise SystemExit("Quitting")
# resuming?
wf = session.query(Workflow).filter_by(name=name).first()
# msg = 'Workflow started, Cosmos v%s' % __version__
if wf:
# resuming.
if not skip_confirm and not confirm(
"Resuming %s. All non-successful jobs will be deleted, "
"then any new tasks in the graph will be added and executed. "
"Are you sure?" % wf
):
raise SystemExit("Quitting")
# assert ex.cwd == output_dir, 'cannot change the output_dir of an workflow being resumed.'
wf.successful = False
wf.finished_on = None
wf.status = WorkflowStatus.resuming
# if not os.path.exists(wf.output_dir):
# raise IOError('output_directory %s does not exist, cannot resume %s' % (wf.output_dir, wf))
wf.log.info("Resuming %s" % wf)
session.add(wf)
failed_tasks = [t for s in wf.stages for t in s.tasks if not t.successful]
n = len(failed_tasks)
if n:
wf.log.info(
"Deleting %s unsuccessful task(s) from SQL database, delete_files=%s" % (n, False)
)
for t in failed_tasks:
session.delete(t)
for stage in [s for s in wf.stages if len(s.tasks) == 0]:
wf.log.info("Deleting stage %s, since it has 0 successful Tasks" % stage)
session.delete(stage)
else:
# start from scratch
# if check_output_dir:
# assert not os.path.exists(output_dir), 'Workflow.output_dir `%s` already exists.' % (output_dir)
wf = Workflow(id=old_id, name=name, manual_instantiation=False, successful=False)
# mkdir(output_dir) # make it here so we can start logging to logfile
session.add(wf)
wf.info["last_cmd_executed"] = get_last_cmd_executed()
wf.info["cwd"] = os.getcwd()
wf.info["fail_fast"] = fail_fast
wf.primary_log_path = primary_log_path
wf.log.info("Committing SQL session...")
session.commit()
session.expunge_all()
session.add(wf)
wf.log.info("Execution Command: %s" % get_last_cmd_executed())
wf.cosmos_app = self
return wf
[docs] def initdb(self):
"""
Initialize the database via sql CREATE statements. If the tables already exists, nothing will happen.
"""
# print >> sys.stderr, 'Initializing sql database for Cosmos v{}..'.format(__version__)
Base.metadata.create_all(bind=self.session.bind)
from ..db import MetaData
meta = MetaData(initdb_library_version=__version__)
self.session.add(meta)
self.session.commit()
return self
[docs] def resetdb(self):
"""
Resets (deletes then initializes) the database. This is not reversible!
"""
# print >> sys.stderr, 'Dropping tables in db...'
Base.metadata.drop_all(bind=self.session.bind)
self.initdb()
return self
[docs] def shell(self):
"""
Launch an IPython shell with useful variables already imported.
"""
from .Workflow import Workflow
cosmos_app = self
session = self.session
workflows = self.session.query(Workflow).order_by("id").all()
wf = workflows[-1] if len(workflows) else None
import IPython
IPython.embed()
def init_flask(self):
from cosmos.web.views import gen_bprint
self.cosmos_bprint = gen_bprint(self.session)
self.flask_app.register_blueprint(self.cosmos_bprint)
return self.flask_app
[docs] def runweb(self, host, port, debug=True):
"""
Starts the web dashboard
:param str host: Host name to bind to. Default is local host, but commonly 0.0.0.0 to allow outside internet traffic.
:param int port: Port to bind to.
"""
self.init_flask()
return self.flask_app.run(debug=debug, host=host, port=port)