--- title: "Getting Started with taskqueue" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Getting Started with taskqueue} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ## Introduction `taskqueue` is an R package for asynchronous parallel computing based on PostgreSQL database. It is designed to dynamically allocate tasks to workers, efficiently utilizing all available computing resources until all tasks are completed. This package is suitable for [Embarrassingly parallel](https://en.wikipedia.org/wiki/Embarrassingly_parallel) problems - parallel computing without any communication among parallel tasks. ## Prerequisites Before using `taskqueue`, ensure you have: 1. PostgreSQL installed and configured (see [PostgreSQL Setup](postgresql-setup.html) vignette) 2. SSH access configured for remote resources (see [SSH Setup](ssh-setup.html) vignette) ## Installation Install the development version from GitHub: ```r devtools::install_github('byzheng/taskqueue') ``` ## Basic Workflow ### 1. Initialize Database ```r library(taskqueue) # Initialize the database structure db_init() ``` ### 2. Define a Resource A computing resource is a facility/computer that can run multiple jobs/workers. ```r resource_add( name = "hpc", type = "slurm", host = "hpc.example.com", nodename = "hpc", workers = 500, log_folder = "/home/user/log_folder/" ) ``` **Parameters:** - `name`: Resource name - `type`: Resource type (currently only `slurm` is supported) - `host`: Network name to access the resource - `nodename`: Obtained by `Sys.info()` on the resource - `workers`: Maximum number of available cores - `log_folder`: Folder to store log files (important for troubleshooting) **Note:** `log_folder` is split by project. It's better to use a high-speed hard drive due to frequent I/O operations. ### 3. Create a Project `taskqueue` manages tasks by project. Each project has its own resources, working directory, runtime requirements, and configurations. ```r # Create a project with common requirements project_add("test_project", memory = 20) ``` ### 4. Assign Resource to Project ```r project_resource_add( project = "test_project", resource = "hpc" ) ``` ### 5. Add Tasks ```r # Add 100 tasks to the project task_add("test_project", num = 100, clean = TRUE) ``` ### 6. Develop Your Worker Function Create a function that: - Takes task `id` as the first argument - Expects no return values - Saves final output to the file system - Checks whether the task is already finished ```r library(taskqueue) fun_test <- function(i) { # Check output file out_file <- sprintf("%s.Rds", i) if (file.exists(out_file)) { return() } # Perform the actual computing # ... your computation here ... # Save final output saveRDS(i, out_file) } # Test locally worker("test_project", fun_test) ``` ### 7. Deploy to HPC After developing and testing your function, save it to a file (e.g., `rcode.R`) and deploy to your HPC resource: ```r # Reset task status (if needed) project_reset("test_project") # Start the project project_start("test_project") # Schedule tasks on slurm resource worker_slurm("test_project", "hpc", "rcode.R", modules = "sqlite/3.43.1") # Check task status task_status("test_project") # Stop the project when done project_stop("test_project") ``` ## Task Status Each task has one of four statuses: - **`idle`**: Task is not running - **`working`**: Task is currently running on a worker - **`failed`**: Task failed for some reason (check the log folder for troubleshooting) - **`finished`**: Task completed without errors ## Common Operations ### Check Task Status ```r task_status("test_project") ``` ### Reset Tasks Reset all tasks in a project: ```r project_reset("test_project") ``` Reset only failed or working tasks: ```r project_reset("test_project", status = "failed") project_reset("test_project", status = "working") ``` ### Manage Projects ```r # List all projects project_list() # Get project details project_get("test_project") # Delete a project project_delete("test_project") ``` ## Example: Complete Workflow Here's a complete example of using `taskqueue`: ```r library(taskqueue) # 1. Initialize database (first time only) db_init() # 2. Add resource resource_add( name = "my_hpc", type = "slurm", host = "hpc.university.edu", nodename = "hpc", workers = 200, log_folder = "/home/user/taskqueue_logs/" ) # 3. Create project project_add("simulation_study", memory = 16) # 4. Assign resource to project project_resource_add(project = "simulation_study", resource = "my_hpc") # 5. Add tasks task_add("simulation_study", num = 1000, clean = TRUE) # 6. Create worker function (save as worker_script.R) library(taskqueue) run_simulation <- function(task_id) { out_file <- sprintf("results/sim_%04d.Rds", task_id) if (file.exists(out_file)) return() # Run your simulation result <- your_simulation_function(task_id) # Save results saveRDS(result, out_file) } worker("simulation_study", run_simulation) # 7. Deploy to HPC project_start("simulation_study") worker_slurm("simulation_study", "my_hpc", "worker_script.R") # 8. Monitor progress task_status("simulation_study") # 9. Stop when complete project_stop("simulation_study") ``` ## Tips 1. **Test Locally First**: Always test your worker function locally with `worker()` before deploying to HPC 2. **Check Logs**: If tasks fail, check the log folder for error messages 3. **Idempotent Functions**: Make your worker functions idempotent (can be run multiple times safely) by checking if output already exists 4. **Resource Monitoring**: Use `task_status()` regularly to monitor progress 5. **Clean Start**: Use `project_reset()` to restart failed tasks