Parallelization using dSQ
Overview
Parallelization is helpful when you need to run a time/memory intensive script many times or over many states. Parallelization helps run multiple states at the same time to make code run quicker and more efficiently. In this document, I will show how to convert a typical python script that loops over states into something that can be run in parallel.
Note: Parallelization is, of course, helpful for more things than just running a bunch of states at once. See this resource to learn more about dSQ and its many capabilities.
Basic Example
When you write a python script use for parallelization, you structure your code slightly differently. For example, a typical python script that loops over states might look something like this.
def my_function(state):
# Your processing code here
print(f"Processing state: {state}")
states = ["AL", "AK", "AZ", "AR", "..."] # etc.
for state in states:
my_function(state)
The same script written in a parallel-ready format would look like this:
def my_function(state):
# Your processing code here
print(f"Processing state: {state}")
if __name__ == "__main__":
# The state is now passed as a command-line argument
state_to_process = sys.argv[1]
my_function(state_to_process)
This needs to be used in conjunction with a joblist.txt
file that looks like this (assuming your parallel-ready .py file is called my_script.py)
python my_script.py AL
python my_script.py AK
python my_script.py AZ
python my_script.py AR
# ... and so on for all your states
In our context, you'd want to load miniconda and your environment too, so it might look something like this
module load miniconda; conda activate my_env; python my_script.py AL
module load miniconda; conda activate my_env; python my_script.py AK
# ... etc.
Next, you want to go to your command line connected to the YCRC clusters and load dSQ onto your path with module load dSQ
. Then we want to tell dSQ to make our bash script. For example, we might run
dsq --job-file joblist.txt --mem-per-cpu 4g -t 20:00 --mail-type ALL
to request 4 gigs and 20 minutes per job. See more on the settings you can specify here. This will create a script called dsq_joblist-yyyy-mm-dd.sh
where yyyy-mm-dd is today's date. The script will look something like this
#!/bin/bash
#SBATCH --array 0-999
#SBATCH --output dsq-joblist-%A_%3a-%N.out
#SBATCH --job-name dsq-joblist
#SBATCH --mem-per-cpu 4g -t 20:00 --mail-type ALL
# DO NOT EDIT LINE BELOW
/path/to/dSQBatch.py --job-file /path/to/joblist.txt --status-dir /path/to/here
-%A_%3a-%N
in the .out
file, it will put all the output in one file instead of creating a separate file for each job.
Finally, make sure all of these files (my_script.py
, joblist.txt
, and dsq_joblist-yyyy-mm-dd.sh
) are in your desired folder and submit your batch script using sbatch dsq-joblist-yyyy-mm-dd.sh
.
If you need more customization or run into errors, YCRC has some great suggestions of how to start and diagnose.
Note: Unfortunately, there is not a way to customize how much memory each job gets, meaning if you run all states in one script you'd have to assign CA and HI the same amount of memory. One potential workaround is to create multiple scripts — a "big state" and "small state" script, for example.
Another Example
Below is the group of files I made to run a very simple function that imports the demographic file and reports the number of unique BENE_ID
s. This implementation is for five states, but obviously could be run for more.
parallel_test.py
import pyarrow.parquet as pq
import pyarrow.compute as pc
import pyarrow as pa
import sys
p = '/home/as4765/medicaid_lab/data/cms/ingested/TMSIS_taf/taf_demog_elig_base/'
years = range(2016,2022)
def function(state):
print(f'Processing state: {state}')
for year in years:
schema = pq.read_schema(f'{p}year={year}/state={state}/data.parquet')
new_schema = schema.set(schema.get_field_index('year'), pa.field('year', pa.int64()))
table = pq.read_table(f'{p}/year={year}/state={state}/data.parquet',
columns = ['BENE_ID'],
schema = new_schema)
unique_ids = pc.unique(table['BENE_ID'])
print(f'Year: {year}, State: {state}, Rows: {len(unique_ids)}')
if __name__ == "__main__":
state_to_process = sys.argv[1]
function(state_to_process)
joblist_parallel_test.txt
module load miniconda; conda activate test_environ; python3 ./parallel_test.py AZ
module load miniconda; conda activate test_environ; python3 ./parallel_test.py HI
module load miniconda; conda activate test_environ; python3 ./parallel_test.py MO
module load miniconda; conda activate test_environ; python3 ./parallel_test.py NH
module load miniconda; conda activate test_environ; python3 ./parallel_test.py WA
dsq --job-file joblist_parallel_test.txt --mem-per-cpu 20g -t 20:00 --mail-type ALL
dsq-joblist_parallel_test-2025-06-26.sh
#!/bin/bash
#SBATCH --output dsq-joblist_parallel_test.out
#SBATCH --array 0-4
#SBATCH --job-name dsq-joblist_parallel_test
#SBATCH --mem-per-cpu 20g -t 20:00 --mail-type ALL
# DO NOT EDIT LINE BELOW
/gpfs/milgram/apps/hpc.rhel7/software/dSQ/1.05/dSQBatch.py --job-file /gpfs/milgram/project/ndumele/as4765/misc/joblist_parallel_test.txt --status-dir /gpfs/milgram/project/ndumele/as4765/misc
Considerations in R
The theory behind parallel processing is the same in R, but the tools are different. The developers of purrr
are working on a function purrr::in_parallel()
to facilitate parallelization of loops. In the meantime, you can use the furrr
package. If you're building a targets
pipeline, follow their parallelization guide.