The functions in the RMINC library easily perform parallel operations. However, sometimes jobs can die on the cluster and there is no ability to resume. Instead, with just one failed job, the whole functions needs to be re-run. This is especially annoying for voxelwise mixed-effects models, which require parallelization to make them feasible. RMINC performs parallelization using the Batchtools package. This tutorial will use the batchtools package directly to run voxelwise statistics, and resubmit failed jobs. It has the added benefit of running any test in a voxelwise manner. 

Setup 

Log into hpf and qlogin

ssh -AX hpf.ccm.sickkids.ca 
[password]
qlogin -X -l vmem=48G,walltime=24:00:00

Load necessary modules.

module use /hpf/largeprojects/MICe/tools/modulefiles
module load mice-env

Start R and load necessary libraries

library(RMINC)
library(tidyverse)
library(batchtools)

Read the data for this tutorial.

df = read.csv('/hpf/largeprojects/MICe/dfernandes/social_behaviour_study/scanbase/tutorial/tutorial.csv', stringsAsFactors=F)

Step 1: Write your voxelwise function

In this example, we will run two linear mixed-effects models on the Scanbase data and perform the log-likelihood ratio test to compare the models and obtain significance for each voxel. I am going to call this function 'vxl_func'.

# takes in a data frame with predictors variables (df)
#  x is the values at a particular voxel (response variable)

vxl_func = function(x,df) {
    lmdf = mutate(df, volume = x)
    mod1 = lmer ( volume ~ Genotype_Code + (1|Study_Name) , lmdf , REML = F )
    mod2 = lmer ( volume ~ (1|Study_Name)                 , lmdf , REML = F )
    anova(mod1,mod2)[2,'Pr(>Chisq)']
}

Step 2: Write a function that reads voxel-data and applies function

We are eventually going to apply 'vxl_func' to every voxel. But to make the problem more managable, we will apply this function to a chunk of voxels (indexed by vector 'idx'). Don't worry if you don't know what I mean by 'indexed'. You probably don't have to worry about it.

This function (called 'par_func') is going to read our data (given as minc files in a column 'mncfiles' in the data frame 'df'), mask it (given by a minc file 'mask'), extract only a chunk of voxels (indexed by vector 'idx'), and run a voxelwise function ('vxl_func') on this chunk.

# idx is a integer vector of the chunk of voxels
# mask is the mask file (values above 0.5 are taken)
# mncfiles is the column in df that has the minc files we and to read and analyse
# vxlfunc is the function we will apply to ever voxel

par_func = function(idx , mask, mncfiles, df, vxl_func) {
   maskvol = mincGetVolume(mask) > 0.5

   anatarr = do.call('cbind',lapply(df[,mncfiles] , function(x) mincGetVolume(x)[maskvol][idx] ))

   apply( anatarr , 1 , function(x) vxl_func(x,df) )
}

Step 3: Create chunks for voxels

Create chunks of voxels. We will create chunks of 10000 voxels each (i.e. Each parallel process with do only 10000 voxels). You can change it as you please. For faster jobs, you can increase the number of voxels per parallel jobs (ex. 100000) and for slower jobs, decrease the number of voxels per parallel job (ex. 1000). We will just make a function for chunking and I'll let you pick how many voxels per job you want.

# change this mask to a mask in your study
mask = '/hpf/largeprojects/MICe/dfernandes/social_behaviour_study/scanbase/registration/scanbase_mask.mnc'
maskvol = mincGetVolume(mask)

# index all the voxels
idxs = 1:(sum(maskvol>0.5))

# function for chunking. 'x' is an index vector denoting the voxels. 'sz' is the number of voxels per chunk.
chnk.mk = function(x,sz) split(x, ceiling(seq_along(x)/sz))

# chunkify into groups of 10000 voxels each
chnkidx = chnk.mk(idxs,10000)

Step 4: Request resourses for your parallel jobs

Set the resourses you want for your job. Keep in mind, the more you demand, the longer it takes for your job to start running. But, of course, if you don't demand enough, your job will fail. Below, I request 16G of memory and 2 hours of walltime for each node. I also cap the number of jobs I allow at any particular time to 1000. If you submit more than 1000 jobs, batchtools will automatically wait until some jobs as finished before submitting more. This way you only have a max of 1000 jobs on the queue at any given time. It is not useful for this example, but may come in handy some other time.


# set default resourses
default.resources <-
  list(nodes = 1,
       memory = "16G",
       walltime = "02:00:00",
       max.concurrent.jobs = 1000)

You also need a template script. RMINC has one by default you can use if you want. Run this command if you want to find it:

tmplscr = system.file("parallel/pbs_script.tmpl", package = "RMINC")

Feel free to make your own template.

Step 5: Create registry and run parallel jobs

You are finally ready to create and run the parallel jobs.

# create registry. Load RMINC, tidyverse, and lme4 libraries
reg1=makeRegistry('eval_anv',packages = c('RMINC','tidyverse','lme4'))

# set template script
reg1$cluster.functions=makeClusterFunctionsTORQUE(tmplscr)

# set default resources
reg1$default.resources=default.resources

# map job ids
ids1 = batchMap(
             reg = reg1,
             fun = par_func,
             idx = chnkidx ,
             more.args = list( 
                     mask = mask , 
                     mncfiles = 'mncfiles', 
                     df = df, 
                     vxl_func = vxl_func)
             )
# you don't need the 'reg = reg1' argument because 
#  batchtools assumes you mean the previously loaded registry. 

## run 10 jobs per node (I am not going to do it here)
#   ids1[, chunk := chunk(job.id, chunk.size = 10)]

# submit jobs
#   you don't need the 'reg = reg1' argument. 
#   batchtools assumes you mean the previously loaded registry. 
submitJobs(reg=reg1,ids=ids1)

# you can wait till jobs are done
# again, you don't need the 'reg = reg1' argument. 
waitForJobs(reg=reg1)

Suppose you made a mistake and want to kill all the jobs, run this: 

killJobs()


If jobs fail, you can use different functions to help you debug.

# Tells you which jobs did not finish
findNotDone() # or findNotDone(reg = reg1)

# Tells you which jobs gave errors
findErrors()

# get Error messages for all jobs that gave errors
getErrorMessages()

# get log for job 80
getLog(id=80)


Sometimes, jobs can fail for no reason as they are dropped by the cluster. 

You can easily reset and resubmit jobs.

# kill jobs that are not finished
killJobs( findNotDone() )

# reset jobs that are not finished
resetJobs( findNotDone() )

# submit jobs that are not finished and give them 4 hours of walltime
submitJobs( findNotDone() , resources=list(walltime = "04:00:00"))

# wait for newly submitted jobs
waitForJobs( findNotDone() )

You can make this into a loop to automatically resubmit dropped jobs. This is dangerous though because jobs could be dropped because of an error and resubmitting does nothing to fix bugs in YOUR code. You should always check if you have actual errors before you resubmit. 

Step 6: Gather results and create a voxel statistics map

Once the jobs finish, we need to gather the results together. This can be done with the 'reduceResultsList' function. It will gather the results into a list. For this example, I will convert the list into a vector, then correct for multiple comparisons. The vector has the same number of elements as the mask. We can then create a volume with the voxel statistics results. The statistics volume can now be used with RMINC/MRIcrotome visualization tools or written to a minc file.


# gather voxelwise results
vxl_pvalues = unlist(reduceResultsList(reg=reg1))
vxl_fdr = p.adjust(vxl_pvalues,method='fdr')

# create statistics volume
statsvol = maskvol
statsvol[ maskvol > 0.5 ]  = vxl_fdr
statsvol[ maskvol =< 0.5 ] = 1

## write statsvol to minc file if you want
# mincWriteVolume(statsvol,output.filename='genotype_fdr.mnc')

Step 7: Delete or load registry

You can delete the registry when you are done to save disk space.


removeRegistry(wait=0,reg=reg1)

If you do NOT delete registry, you can load the registry again in a different R session. 

reg1 = loadRegistry('eval_anv')

# The loaded registry is not writable by default, so you can only view results, not change them.
#  This is useful if you don't want to accidentally change results
#  To make the registry writeable:
reg1 = loadRegistry('eval_anv', writeable=T)