| Title: | Utilities to manage, estimate speed, optimize use of parallel clusters |
|---|---|
| Description: | Miscellaneous utilities developed by the Predictive Ecology Group (<http://predictiveecology.org>). |
| Authors: | Eliot J B McIntire [aut, cre] (ORCID: <https://orcid.org/0000-0002-6914-8316>), His Majesty the King in Right of Canada, as represented by the Minister of Natural Resources Canada [cph] |
| Maintainer: | Eliot J B McIntire <[email protected]> |
| License: | GPL-3 |
| Version: | 0.0.22 |
| Built: | 2026-06-04 19:39:12 UTC |
| Source: | https://github.com/PredictiveEcology/clusters |
This includes copying files over to unique(cores) machines, then loading all objects from disk in each of the parallel cores.
clusterSetup( messagePrefix = "DEoptim_", itermax = 500, trace = TRUE, strategy = 3, initialpop = NULL, NP = NULL, cores, logPath, libPath, objsNeeded, pkgsNeeded, nCoresNeeded = 100, envir = parent.frame() )clusterSetup( messagePrefix = "DEoptim_", itermax = 500, trace = TRUE, strategy = 3, initialpop = NULL, NP = NULL, cores, logPath, libPath, objsNeeded, pkgsNeeded, nCoresNeeded = 100, envir = parent.frame() )
A list of items that can be passed to DEoptim.control()
This is for cleaning up cases where an interrupted optimization is leading to multiple files for the same .runName. This will remove duplicates, keeping only the most recent.
dirNew( path, secsAgo = Inf, after = Sys.time() - secsAgo, pattern = "^(.+)\\_[[:digit:]]{6,8}.*\\.png" )dirNew( path, secsAgo = Inf, after = Sys.time() - secsAgo, pattern = "^(.+)\\_[[:digit:]]{6,8}.*\\.png" )
path |
A folder in which to search for duplicates |
pattern |
The regular expression to search for, to identify the files. This must have 1 set of parentheses (), as only the content between the () will be used for duplicate assessment, i.e., remove anything in the file that shouldn't be used. |
delete |
Logical. Default |
For side effects: removed files
Lightweight wrapper for parallelly::makeClusterPSOCK
makeClusterPSOCK( workers, outfile = NULL, rscript_libs = NULL, ..., port = NULL, rshopts = c("-o", "ExitOnForwardFailure=yes"), tries = 5L, delay = 5, renice = 20, revtunnel = TRUE )makeClusterPSOCK( workers, outfile = NULL, rscript_libs = NULL, ..., port = NULL, rshopts = c("-o", "ExitOnForwardFailure=yes"), tries = 5L, delay = 5, renice = 20, revtunnel = TRUE )
workers |
The hostnames of workers (as a character vector) or the number of localhost workers (as a positive integer). |
outfile |
Optional log file path. |
rscript_libs |
Optional library paths for workers. |
... |
Additional arguments passed to parallelly::makeClusterPSOCK. |
port |
Optional port or port block start. |
tries, delay
|
Maximum number of attempts done to launch each node
with |
renice |
A numerical 'niceness' (priority) to set for the worker processes. |
revtunnel |
If TRUE, a reverse SSH tunnel is set up for each worker such
that the worker R process sets up a socket connection to its local port
|
Continuously queries each worker for the number of active threads via
clusters::numActiveThreads() and displays a single, aligned status line
that updates in place. It also tracks the peak thread count per core
(named by the cores vector). On interrupt (Ctrl-C), it prints a final
aligned summary of peak values and returns them invisibly.
monitorCluster(cl, cores, pad = 2, interval = 1)monitorCluster(cl, cores, pad = 2, interval = 1)
cl |
A PSOCK cluster object (e.g., created with
|
cores |
A character vector of core (host) names. The order and names define column layout and are used to name the returned peak vector. |
pad |
Integer number of spaces to insert between columns
(default: |
interval |
Numeric number of seconds to wait between refreshes
(default: |
The display renders a one-line header of core names (left-aligned),
followed by a live one-line status of current counts (right-aligned within
the width of each core name). Lines are redrawn in-place using ANSI erase
sequences (\033[2K\r) which are supported in RStudio Server and most
ANSI-aware terminals.
The function calls clusters::numActiveThreads() on each worker.
If a worker errors, that tick treats the value as NA for display
and as 0 when updating the peak (so peaks are not reduced by NAs).
The header is printed once. Each subsequent update clears and redraws only the live values line using ANSI sequences. If ANSI is not supported by the current console, the escape codes may appear literally; in that case, run the function in RStudio Server, a modern terminal, or adapt it to use fallbacks.
The function runs until interrupted (e.g., Ctrl-C). On interrupt, it clears the live line, prints the final peaks aligned under the header, and returns the named peak vector invisibly.
Invisibly returns a named integer vector of peak active thread counts,
with names matching cores. Also prints a final aligned summary on
interrupt.
The clusters package must be installed on each worker. This function
issues parallel::clusterEvalQ(cl, requireNamespace("clusters")) once
to assert availability, but it does not install packages remotely.
This function uses ANSI escape sequences to clear and redraw a single line
("\033[2K\r"). RStudio Server consoles interpret ANSI by default.
If your output is redirected (non-TTY) or ANSI is disabled, consider
adapting the clearing step to use a fallback (e.g., overwrite with spaces).
makeClusterPSOCK,
clusterEvalQ,
numActiveThreads
## Not run: if (FALSE) { library(parallelly) library(parallel) cores <- c("birds", "biomass", "camas", "carbon", "caribou", "coco", "core", "dougfir", "fire", "mpb", "sbw", "mega", "acer", "abies", "pinus") # Create a PSOCK cluster over SSH (example; configure to your environment) cl <- parallelly::makeClusterPSOCK( workers = cores, rshcmd = "ssh", homogeneous = FALSE ) # Watch and track peaks; press Ctrl-C to stop. peak <- monitorCluster(cl, cores, pad = 2, interval = 1) # After interrupt, 'peak' is a named integer vector with per-core maxima. print(peak) stopCluster(cl) } ## End(Not run)## Not run: if (FALSE) { library(parallelly) library(parallel) cores <- c("birds", "biomass", "camas", "carbon", "caribou", "coco", "core", "dougfir", "fire", "mpb", "sbw", "mega", "acer", "abies", "pinus") # Create a PSOCK cluster over SSH (example; configure to your environment) cl <- parallelly::makeClusterPSOCK( workers = cores, rshcmd = "ssh", homogeneous = FALSE ) # Watch and track peaks; press Ctrl-C to stop. peak <- monitorCluster(cl, cores, pad = 2, interval = 1) # After interrupt, 'peak' is a named integer vector with per-core maxima. print(peak) stopCluster(cl) } ## End(Not run)
This only works on non-linux operating systems, as it uses ps
numActiveThreads(pattern = "", minCPU = 50)numActiveThreads(pattern = "", minCPU = 50)
pattern |
An optional search pattern to look for when identifying threads that are active. If left at default, then all threads that are active will count. |
minCPU |
The minimum CPU (in percent , i.e.,0 to 100) that a thread must be using for it to count as actively being used. |
An integer representing the current number of threads that are being used
as a CPU% greater than minCPU. This can be used, e.g., with
parallelly::availableCores() to estimate the number of cores that are available
to be used.
This does not address memory or disk use issues.
# This will show the active number, updated every 0.5 seconds cat("Number Active CPUs right now:\n"); while(TRUE) { numAC <- clusters::numActiveThreads(); cat("\r"); cat(numAC); cat(" "); Sys.sleep(0.5) }# This will show the active number, updated every 0.5 seconds cat("Number Active CPUs right now:\n"); while(TRUE) { numAC <- clusters::numActiveThreads(); cat("\r"); cat(numAC); cat(" "); Sys.sleep(0.5) }
testMachine
Plot the outputs from testMachine
plotMachine( Ncores, coreTimes, estTotTime, optimisticTotTime, systemTimes, N = 100, nam, detectedCores )plotMachine( Ncores, coreTimes, estTotTime, optimisticTotTime, systemTimes, N = 100, nam, detectedCores )
par(mfrow = c(2,length(hosts))) Map(out = outs, nam = names(outs), function(out, nam) do.call(plotMachine, append(out, list(nam = nam))))par(mfrow = c(2,length(hosts))) Map(out = outs, nam = names(outs), function(out, nam) do.call(plotMachine, append(out, list(nam = nam))))
Uses vmstat (must be installed; it is by default on linux).
resourcesUsed(machines = "localhost", resource = "us")resourcesUsed(machines = "localhost", resource = "us")
machines |
Character vector of the name(s) of the PSOCK resource to
query, e.g., |
resources |
Column extracted in vmstat. Defaults to |
Returns the outputs from vmstat
This is for cleaning up cases where an interrupted optimization is leading to multiple files for the same .runName. This will remove duplicates, keeping only the most recent.
rmIncompleteDups( path, pattern = "^(.+)\\_[[:digit:]]{5,8}.*\\.png", delete = FALSE )rmIncompleteDups( path, pattern = "^(.+)\\_[[:digit:]]{5,8}.*\\.png", delete = FALSE )
path |
A folder in which to search for duplicates |
pattern |
The regular expression to search for, to identify the files. This must have 1 set of parentheses (), as only the content between the () will be used for duplicate assessment, i.e., remove anything in the file that shouldn't be used. |
delete |
Logical. Default |
For side effects: removed files
hosts
Runs test on each machine in hosts
runTests( hosts, repos = c("predictiveecology.r-universe.dev", getOption("repos")), clustersBranch = "main", RscriptPath = "/usr/local/bin/Rscript", Npops = 100 )runTests( hosts, repos = c("predictiveecology.r-universe.dev", getOption("repos")), clustersBranch = "main", RscriptPath = "/usr/local/bin/Rscript", Npops = 100 )
a list; same as getHostCombination return.
dirNew and tableFiles
Convenient wrapper.
summaryOutputFolder( path, pattern = "^.+hists/(.+)\\_iter.+\\_[[:digit:]]{6,8}.*\\.png" )summaryOutputFolder( path, pattern = "^.+hists/(.+)\\_iter.+\\_[[:digit:]]{6,8}.*\\.png" )
path |
A folder in which to search for duplicates |
pattern |
The regular expression to base the |
Tabulation of the files, based on the pattern.
pattern
Using a regular expression, with a single (.+) identifying the parts of
the filenames to keep, and therefore to base the table on. Everything
before
tableFiles( files, pattern = "^.+hists/(.+)\\_iter.+\\_[[:digit:]]{6,8}.*\\.png" )tableFiles( files, pattern = "^.+hists/(.+)\\_iter.+\\_[[:digit:]]{6,8}.*\\.png" )
files |
A vector of full filenames |
pattern |
The regular expression to base the |
Tabulation of the files, based on the pattern.
termsInDEoptimtermsInDEoptim
termsInDEoptim(fireSense_spreadFormula, thresh, numParams)termsInDEoptim(fireSense_spreadFormula, thresh, numParams)
fireSense_spreadFormula |
The formula to be submitted to |
thresh |
The threshold for accepting fits; e.g., from |
numParams |
The number of parameters (TODO: improve description) |
Test machines
testMachine(NcoresMax = parallel::detectCores(), N = 100, thinning = 5)testMachine(NcoresMax = parallel::detectCores(), N = 100, thinning = 5)
# example code hosts <- c("97", "106", "184", "189", "213", "217", "220")#, "102") hosts <- makeHosts(ipbase = "spades", hosts) hosts <- c(hosts, "132.156.148.105", "localhost") cl <- parallelly::makeClusterPSOCK(hosts, rscript = c("nice", "/usr/local/bin/Rscript")) parallel::clusterExport(cl, c("testMachine", "pkgs")) parallel::clusterEvalQ(cl, { if (!require("Require")) install.packages("Require", repos = c("https://predictiveecology.r-universe.dev", getOption("repos"))) Require::Require(pkgs) }) st <- system.time(outs <- parallel::clusterApply(cl, seq_along(cl), function(x) testMachine(N = 1, NcoresMax = 10, thinning = 3))) names(outs) <- hosts print(st) getHostCombination(outs, Npops = 100)# example code hosts <- c("97", "106", "184", "189", "213", "217", "220")#, "102") hosts <- makeHosts(ipbase = "spades", hosts) hosts <- c(hosts, "132.156.148.105", "localhost") cl <- parallelly::makeClusterPSOCK(hosts, rscript = c("nice", "/usr/local/bin/Rscript")) parallel::clusterExport(cl, c("testMachine", "pkgs")) parallel::clusterEvalQ(cl, { if (!require("Require")) install.packages("Require", repos = c("https://predictiveecology.r-universe.dev", getOption("repos"))) Require::Require(pkgs) }) st <- system.time(outs <- parallel::clusterApply(cl, seq_along(cl), function(x) testMachine(N = 1, NcoresMax = 10, thinning = 3))) names(outs) <- hosts print(st) getHostCombination(outs, Npops = 100)
DEoptim object pars
Make histograms of DEoptim object pars
visualizeDE(DE, cachePath, titles, lower, upper)visualizeDE(DE, cachePath, titles, lower, upper)
DE |
An object from a |
cachePath |
A |