wdlRunR

Follow development at github.

This package provides a simple api to the Broad cromwell workflow engine.

Install

For the time being, use the develop branch:

require(devtools)
## Loading required package: devtools
devtools::install_github('seandavi/wdlRunR', ref = "develop")
## Skipping install of 'wdlRunR' from a github remote, the SHA1 (d70de8ab) has not changed since last install.
##   Use `force = TRUE` to force installation

Background

Big Data in biology is a reality. Operating on large-scale biological data, particularly when the data flows require interdependencies and multiple steps, requires smart workflow management tools. Such workflow managers should, ideally:

  • Allow scaling of tasks across available compute infrastructure.
  • Encourage reproducible research by enabling and encouraging workflows to be described using text files.
  • Maintain provenance over tools, workflows, inputs, and outputs.
  • Be robust to failure.
  • Support restarts and, potentially, work caching to reduce redundant recompute and cost.
  • Support multiple compute resources, enabling workflow portability and reuse.

The Cromwell is such a workflow system. The [wdlRunR](https://github.com/seandavi/wdlRunR) R package allows users of the R statistical programming environment to interact with the Cromwell workflow system. The wdlRunR R package can act as the “orchestrator” for job submission to the Cromwell workflow system. The R ecosystem provides functionality for documentation, literate programming, publication-quality graphics, statistical analysis, and powerful data and metadata manipulation capabilities. Workflows of nearly arbitrary size to be described and then run using the Cromwell to run large-scale data processing steps, but with the entire workflow, including data munging, workflow metadata management, cloud orchestration and data management, and analysis of processed data described as R code or literate programming documents.

Cromwell is available as a java jarfile and runs as a server or as a command-line executor of single workflows. This package is focused on interacting with Cromwell as a server and accessing it via its documented REST API.

Cromwell jobs are described using Workflow Description Language (WDL). WDL resources are available:

Getting started

As a first step, we download the current released version of Cromwell. We then run the server and put the log file into a tempdir() using R code and a call to system().

library(wdlRunR)
cromwell_jar = getCromwellJar(cromwell_version='24')
cromwell_log = file.path(tempdir(),'cromwell.log')
system(sprintf('java -jar %s server > %s 2>&1 &', cromwell_jar, cromwell_log))
# let server start up
Sys.sleep(20)

Cromwell should now be running in the background and we can verify by checking for processes with “cromwell” in the name.

system('ps aux | grep cromwell')

Workflows

The simplest example (and one that runs in the time needed for a demo!) is a “Hello, WDL” workflow. Right now, we simply describe a WDL workflow as text or as a text file (see below).

hello_wdl = "task hello {
  String name

  command {
    echo 'Hello ${name}!'
  }
  output {
    File response = stdout()
  }
}

workflow test {
  call hello
}"

Submit a batch job

We are going to submit a batch of jobs (10 of them) with randomly-generated names. The input to the cromwellBatch() function is a data.frame with columns named for the WDL inputs. Each row of the data.frame will become a workflow that will be run (locally in this case) by Cromwell.

options(cromwell_base = 'http://localhost:8000')
randomStrings = sapply(1:10,function(r) {paste(sample(LETTERS,10),collapse="")})
wdlInputs = data.frame(test.hello.name=randomStrings)

We can submit a batch of jobs to cromwell by simply posting to the correct API endpoint and the function cromwellBatch() wraps this process in R.

res = cromwellBatch(wdlSource = hello_wdl,workflowInputs=wdlInputs)
# and we do this to allow the jobs to get running
Sys.sleep(20)

Monitoring jobs

Once jobs are submitted, they will enter the Cromwell workflow monitoring system. We can query this system at any time. Look at the API documentation and help('cromwellQuery') in R for some more details.

We can use any R functionality to manage our WDL and inputs. For example, we can get a WDL workflow from a github-hosted WDL workflow. Here is an example of using the same “Hello, WDL” workflow.

#read from github or other URL
library(httr)
hello_remote_wdl = content(GET("https://raw.githubusercontent.com/DockstoreTestUser/dockstore-whalesay/master/Dockstore.wdl"),'text')

And we can use R for creating inputs in any way we like. Here, we use the babynames R package to set the names.

if(require(babynames))
    wdlInputs = data.frame(test.hello.name=sample(babynames$name,10))
z = cromwellBatch(wdlSource = hello_remote_wdl,workflowInputs=wdlInputs)
Sys.sleep(20)
cromwellQuery()

To make things a bit more interesting, we can simulate long-running jobs using a “sleeping” variation of “Hello, WDL.”

hello_wdl_sleep = "task hello {
  String name

  command {
    echo 'Hello ${name}!' && sleep 60
  }
  output {
    File response = stdout()
  }
}

workflow test {
  call hello
}"
randomStrings = sapply(1:10,function(r) {paste(sample(LETTERS,10),collapse="")})
wdlInputs = data.frame(test.hello.name=randomStrings)
z = cromwellBatch(wdlSource = hello_wdl_sleep,workflowInputs=wdlInputs)
# to let Cromwell get started with the jobs.
sleep(10)

At this point, checking cromwellQuery() will show running jobs (if done within 60 seconds, of course). Limiting to “Running” jobs is also easy.

cromwellQuery(term="status=Running")

And we can check for outputs as well.

results = cromwellQuery(term="status=Succeeded")
outputs = cromwellOutputs(results$id)
str(outputs,list.len=4)
readLines(outputs[[1]]$outputs$test_hello_response)
logs = cromwellLogs(results$id)
# stderr -- should be empty for this task
readLines(logs[[1]]$test.hello[[1]]$stderr)
# stdout
readLines(logs[[1]]$test.hello[[1]]$stdout)

cleanup

# messy!
unlink(cromwell_jar)
unlink(cromwell_log)
system('pgrep -f cromwell | xargs -I {} kill -9 {}')

sessionInfo

sessionInfo()