diff --git a/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/R/main.R b/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/R/main.R index 0f9243cc..2eb6423b 100644 --- a/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/R/main.R +++ b/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/R/main.R @@ -300,6 +300,7 @@ mainAgeing = function(file = NULL , cpu = cpu , memory = memory , time = time , + jobname = paste0("impc_job_", DRversion), extraBatchParameters = extraBatchParameters ) write( diff --git a/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/R/sideFunctions.R b/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/R/sideFunctions.R index 13cacd5c..e1ec3174 100644 --- a/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/R/sideFunctions.R +++ b/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/R/sideFunctions.R @@ -688,6 +688,7 @@ BatchGenerator = function(file , cpu = 1 , memory = "8G" , time = "10:00:00" , + jobname = NULL , extraBatchParameters = NULL) { dirOut = file.path(dir, 'ClusterOut') dirErr = file.path(dir, 'ClusterErr') @@ -701,7 +702,7 @@ BatchGenerator = function(file , ro = paste(' -o ', paste0('"', oname, '.ClusterOut', '"'), sep = '') re = paste(' -e ', paste0('"', ename, '.ClusterErr', '"'), sep = '') rf = paste( - "sbatch --job-name=impc_stats_pipeline_job --mem=", memory, + "sbatch --job-name=", jobname, " --mem=", memory, " --time=", time, extraBatchParameters , ' --cpus-per-task=' , diff --git a/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/inst/extdata/StatsPipeline/jobs/InputDataGenerator.R b/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/inst/extdata/StatsPipeline/jobs/InputDataGenerator.R index ce241a4e..cdf352d0 100644 --- a/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/inst/extdata/StatsPipeline/jobs/InputDataGenerator.R +++ b/Late adults stats pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/inst/extdata/StatsPipeline/jobs/InputDataGenerator.R @@ -20,7 +20,8 @@ generate_data <- function(args, thresh = 4) { controlSize = 1500, extraBatchParameters = NULL, combineEAandLA = FALSE, - solrBaseURL = NULL + solrBaseURL = NULL, + DRversion = args[3] ) trash <- NULL gc() diff --git a/annotation_pipeline/loader.py b/annotation_pipeline/loader.py index 1427e686..3b4c9310 100644 --- a/annotation_pipeline/loader.py +++ b/annotation_pipeline/loader.py @@ -86,12 +86,12 @@ def main(): file_list = [line.strip() for line in f] total_files = len(file_list) - # Store StatPackets temporary. - output_dir = Path("tmp") + # Store StatPackets. + output_dir = Path("../annotation_pipeline_output") if not output_dir.exists(): output_dir.mkdir() - output_file = output_dir / (file_list_path.name + "_.statpackets") + output_file = output_dir / (file_list_path.name + ".statpackets") for i, file in enumerate(file_list): file_path = Path(file) diff --git a/orchestration/orchestration.sh b/orchestration/orchestration.sh index e50584d8..23167a92 100755 --- a/orchestration/orchestration.sh +++ b/orchestration/orchestration.sh @@ -18,6 +18,7 @@ WINDOWING_PIPELINE=${7:-"true"} # Redirect all output and errors to the log file. LOGFILE=${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_logs/orchestration_${VERSION}.log +JOBNAME="impc_job_${VERSION}" exec > >(tee -a "$LOGFILE") 2>&1 echo "Starting pipeline run. Data release $VERSION. Fetching packages from $REMOTE / $BRANCH." @@ -33,7 +34,7 @@ function message0() { # Function to wait until all jobs on SLURM complete. function waitTillCommandFinish() { while true; do - if ! (squeue --format="%A %.30j" | grep -q "impc_stats_pipeline_job"); then + if ! (squeue --format="%A %.30j" | grep -q "$JOBNAME"); then message0 "Done waiting for SLURM jobs to complete." break fi @@ -76,7 +77,8 @@ function submit_limit_jobs() { # Preparation. mkdir --mode=775 ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_dr${VERSION} cd ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_dr${VERSION} -cp ${PARQUET_FOLDER}/*.parquet ./ +mkdir input_parquet_files +cp ${PARQUET_FOLDER}/*.parquet ./input_parquet_files cp ${MP_CHOOSER_FOLDER}/part*.txt ./mp_chooser.json message0 "Update packages to the latest version" @@ -87,95 +89,93 @@ message0 "Update completed" # Statistical pipeline. message0 "Starting the IMPC statistical pipeline..." -mkdir SP compressed_logs +mkdir 01_batching compressed_logs export input_path=$(realpath .) -export sp_results=$(realpath SP) +export sp_results=$(realpath 01_batching) message0 "Parquet files path: ${input_path}" message0 "Output path: ${sp_results}" -cd SP +cd 01_batching message0 "Phase I. Convert parquet files into Rdata..." message0 "Step 1. Create jobs" -step1_files=$(find .. -type f -name '*.parquet' -exec realpath {} \;) +step1_files=$(find ../input_parquet_files -type f -name '*.parquet' -exec realpath {} \;) for file in $step1_files; do - echo "sbatch --job-name=impc_stats_pipeline_job --mem=10G --time=00:10:00 -e ${file}.err -o ${file}.log --wrap='Rscript Step2Parquet2Rdata.R $file'" >> jobs_step2_Parquet2Rdata.bch + file_name=$(basename "${file}" .parquet) + echo "sbatch --job-name=${JOBNAME} --mem=10G --time=00:10:00 -e ../compressed_logs/step2_logs/${file_name}.err -o ../compressed_logs/step2_logs/${file_name}.log --wrap='Rscript Step2Parquet2Rdata.R $file'" >> jobs_step2_Parquet2Rdata.bch done message0 "Step 2. Read parquet files and create pseudo Rdata" fetch_script 0-ETL/Step2Parquet2Rdata.R -sbatch --job-name=impc_stats_pipeline_job --time=01:00:00 --mem=1G -o ../compressed_logs/step2_job_id.txt --wrap="bash jobs_step2_Parquet2Rdata.bch" +sbatch --job-name=${JOBNAME} --time=01:00:00 --mem=1G -o ../compressed_logs/step2_job_id.txt --wrap="bash jobs_step2_Parquet2Rdata.bch" waitTillCommandFinish rm Step2Parquet2Rdata.R -find ../ -type f -name '*.log' -exec zip -q -m ../compressed_logs/step2_logs.zip {} + -find ../ -type f -name '*.err' -exec zip -q -m ../compressed_logs/step2_logs.zip {} + +sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_step2.txt --wrap="zip -r -m -q ../compressed_logs/step2_logs.zip ../compressed_logs/step2_logs/" message0 "Step 3. Merging pseudo Rdata files into single file for each procedure - jobs creator" -dirs=$(find "${sp_results}/ProcedureScatterRdata" -maxdepth 1 -type d) +dirs=$(find "${sp_results}/ProcedureScatterRdata" -maxdepth 1 -mindepth 1 -type d) for dir in $dirs; do - echo "sbatch --job-name=impc_stats_pipeline_job --mem=50G --time=01:30:00 -e ${dir}/step4_merge_rdatas.err -o ${dir}/step4_merge_rdatas.log --wrap='Rscript Step4MergingRdataFiles.R ${dir}'" >> jobs_step4_MergeRdatas.bch + file_name=$(basename "${dir}") + echo "sbatch --job-name=${JOBNAME} --mem=50G --time=01:30:00 -e ../compressed_logs/step4_logs/${file_name}_step4.err -o ../compressed_logs/step4_logs/${file_name}_step4.log --wrap='Rscript Step4MergingRdataFiles.R ${dir}'" >> jobs_step4_MergeRdatas.bch done message0 "Step 4. Merging pseudo Rdata files into single files per procedure" fetch_script 0-ETL/Step4MergingRdataFiles.R -sbatch --job-name=impc_stats_pipeline_job --time=01:00:00 --mem=1G -o ../compressed_logs/step4_job_id.txt --wrap="bash jobs_step4_MergeRdatas.bch" +sbatch --job-name=${JOBNAME} --time=01:00:00 --mem=1G -o ../compressed_logs/step4_job_id.txt --wrap="bash jobs_step4_MergeRdatas.bch" waitTillCommandFinish rm Step4MergingRdataFiles.R -find . -type f -name '*.log' -exec zip -q -m ../compressed_logs/step4_logs.zip {} + -find . -type f -name '*.err' -exec zip -q -m ../compressed_logs/step4_logs.zip {} + +sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_step4.txt --wrap="zip -r -m -q ../compressed_logs/step4_logs.zip ../compressed_logs/step4_logs/" message0 "Phase I. Compressing the log files and house cleaning..." zip -q -rm ../compressed_logs/phase1_jobs.zip *.bch rm -rf ProcedureScatterRdata message0 "Starting Phase II, packaging the big data into small packages ..." -mkdir DataGeneratingLog for file in $(find Rdata -type f -exec realpath {} \;); do file_basename=$(basename $file .Rdata) - echo "sbatch --job-name=impc_stats_pipeline_job --mem=45G --time=6-00 -e DataGeneratingLog/${file_basename}_errorlog.log -o DataGeneratingLog/${file_basename}_outputlog.log --wrap='Rscript InputDataGenerator.R ${file} ${file_basename}'" >> DataGenerationJobList.bch + echo "sbatch --job-name=${JOBNAME} --mem=45G --time=6-00 -e ../compressed_logs/phase2_logs/${file_basename}.err -o ../compressed_logs/phase2_logs/${file_basename}.log --wrap='Rscript InputDataGenerator.R ${file} ${file_basename} ${VERSION}'" >> DataGenerationJobList.bch done fetch_script jobs/InputDataGenerator.R -sbatch --job-name=impc_stats_pipeline_job --time=01:00:00 --mem=1G -o ../compressed_logs/phase2_job_id.txt --wrap="bash DataGenerationJobList.bch" +sbatch --job-name=${JOBNAME} --time=01:00:00 --mem=1G -o ../compressed_logs/phase2_job_id.txt --wrap="bash DataGenerationJobList.bch" waitTillCommandFinish rm InputDataGenerator.R message0 "End of packaging data." message0 "Phase II. Compressing the log files and house cleaning..." -mv *.bch DataGeneratingLog/ -zip -q -rm phase2_logs.zip DataGeneratingLog/ -mv phase2_logs.zip ../compressed_logs/ +mv *.bch ../compressed_logs/phase2_logs/ +sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_phase2.txt --wrap="zip -r -m -q ../compressed_logs/phase2_logs.zip ../compressed_logs/phase2_logs/" message0 "Appending all procedure based jobs into one single file..." -mkdir jobs -find ./*/*_RawData/*.bch -type f | xargs cat >> jobs/AllJobs.bch +mkdir ../02_sp_output +find ./*/*_RawData/*.bch -type f | xargs cat >> ../02_sp_output/phase3_jobs.bch message0 "Phase III. Initialising the statistical analysis..." -cd jobs +cd ../02_sp_output message0 "Updating the dynamic contents from the IMPReSS..." -R --quiet -e \ -"DRrequiredAgeing:::updateImpress( \ - updateImpressFileInThePackage = TRUE, \ - updateOptionalParametersList = TRUE, \ - updateTheSkipList = TRUE, \ - saveRdata = FALSE \ -)" +# R --quiet -e \ +# "DRrequiredAgeing:::updateImpress( \ +# updateImpressFileInThePackage = TRUE, \ +# updateOptionalParametersList = TRUE, \ +# updateTheSkipList = TRUE, \ +# saveRdata = FALSE \ +# )" message0 "Running the IMPC statistical pipeline by submitting jobs..." if [ "${WINDOWING_PIPELINE}" = true ]; then fetch_script jobs/function_windowed.R mv function_windowed.R function.R else - fetch_script jobs/function.R + fetch_script function.R fi - R --quiet -e \ "DRrequiredAgeing:::ReplaceWordInFile( \ '$(realpath function.R)', \ 'DRversionNotSpecified', \ ${VERSION} \ )" -chmod 775 AllJobs.bch -submit_limit_jobs AllJobs.bch ../../compressed_logs/phase3_job_id.txt + +chmod 775 phase3_jobs.bch +submit_limit_jobs phase3_jobs.bch ../compressed_logs/phase3_job_id.txt waitTillCommandFinish message0 "Postprocessing the IMPC statistical analysis results..." @@ -194,56 +194,55 @@ R --quiet -e \ storepath='$(realpath RPackage_backup)' \ )" -message0 "Compress phase III log files" -find . -type f -name '*.ClusterOut' -exec zip -q -m ../compressed_logs/phase3_logs.zip {} + -message0 "Compress phase III error files" -find . -type f -name '*.ClusterErr' -exec zip -q -m ../compressed_logs/phase3_errs.zip {} + +message0 "Submit phase III log and err files compression" +sbatch --job-name=compress_logs --time=1-00:00:00 --mem=1G -o ../compressed_logs/zip_phase3_logs.txt --wrap="find . -type d -name 'ClusterOut' -exec zip -q -r -m ../compressed_logs/phase3_logs.zip {} \;" +sbatch --job-name=compress_logs --time=1-00:00:00 --mem=1G -o ../compressed_logs/zip_phase3_errs.txt --wrap="find . -type d -name 'ClusterErr' -exec zip -q -r -m ../compressed_logs/phase3_errs.zip {} \;" message0 "This is the last step. If you see no file in the list below, the SP is successfully completed." # Annotation pipeline. message0 "Starting the IMPC annotation pipeline..." -cd jobs/Results_IMPC_SP_Windowed/ +cd ../02_sp_output/Results_IMPC_SP_Windowed/ message0 "Step 1: Clean ups and creating the global index for the results." message0 "Indexing the results..." for dir in $(find . -mindepth 2 -maxdepth 2 -type d); do base_dir=$(basename "$dir") output_file="FileIndex_${base_dir}_$(printf "%.6f" $(echo $RANDOM/32767 | bc -l)).Ind" - echo "sbatch --job-name=impc_stats_pipeline_job --mem=1G --time=2-00 \ --e ${base_dir}_error.err -o ${base_dir}_output.log --wrap=\"find $dir -type f -name '*.tsv' -exec realpath {} \; > $output_file\"" >> minijobs.bch + echo "sbatch --job-name=${JOBNAME} --mem=1G --time=2-00 \ +-e ../../compressed_logs/minijobs_logs/${base_dir}.err -o ../../compressed_logs/minijobs_logs/${base_dir}.log \ +--wrap=\"find $dir -type f -name '*.tsv' -exec realpath {} \; > $output_file\"" >> minijobs.bch done chmod 775 minijobs.bch -submit_limit_jobs minijobs.bch ../../../compressed_logs/minijobs_job_id.txt +submit_limit_jobs minijobs.bch ../../compressed_logs/minijobs_job_id.txt waitTillCommandFinish -mv minijobs.bch ../../../compressed_logs +mv minijobs.bch ../../compressed_logs +sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../../compressed_logs/zip_minijobs.txt --wrap="zip -r -m -q ../../compressed_logs/minijobs_logs.zip ../../compressed_logs/minijobs_logs/" -find . -type f -name '*_output.log' -exec zip -q -m ../../../compressed_logs/minijobs_logs.zip {} + -find . -type f -name '*_error.err' -exec zip -q -m ../../../compressed_logs/minijobs_logs.zip {} + -message0 "Moving single indeces into a separate directory called AnnotationExtractorAndHadoopLoader..." -mkdir AnnotationExtractorAndHadoopLoader -chmod 775 AnnotationExtractorAndHadoopLoader -mv *.Ind AnnotationExtractorAndHadoopLoader -cd AnnotationExtractorAndHadoopLoader +message0 "Moving single indices into a separate directory called 03_indices_and_splits..." +mkdir ../../03_indices_and_splits +chmod 775 ../../03_indices_and_splits +cd ../../03_indices_and_splits +mv ../02_sp_output/Results_IMPC_SP_Windowed/*.Ind . message0 "Concatenating single index files to create a global index for the results..." -cat *.Ind >> AllResultsIndeces.txt -message0 "Zipping the single indeces..." -zip -q -rm allsingleindeces.zip *.Ind -split -50 AllResultsIndeces.txt split_index_ +cat *.Ind | shuf --random-source=<(yes "42") >> global_results_index.txt +message0 "Zipping the single indices..." +sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_indices.txt --wrap="zip -r -m -q individual_indices.zip *.Ind" +split -1000 global_results_index.txt split_index_ message0 "Convert the mp_chooser JSON file to Rdata..." -R --quiet -e "a = jsonlite::fromJSON('../../../../mp_chooser.json');save(a,file='../../../../mp_chooser.json.Rdata')" -export MP_CHOOSER_FILE=$(realpath ../../../../mp_chooser.json.Rdata | tr -d '\n') +R --quiet -e "a = jsonlite::fromJSON('../mp_chooser.json');save(a,file='../mp_chooser.json.Rdata')" +export MP_CHOOSER_FILE=$(realpath ../mp_chooser.json.Rdata | tr -d '\n') if [[ -z "${MP_CHOOSER_FILE}" || ! -f "${MP_CHOOSER_FILE}" ]]; then echo -e "ERROR: mp_chooser not found at location\n\t${MP_CHOOSER_FILE}" exit 1 fi -mkdir err log out +message0 "Generate annotation jobs..." for file in $(find . -maxdepth 1 -type f -name "split_index*"); do - echo "sbatch --job-name=impc_stats_pipeline_job --mem=5G --time=2-00 \ - -e err/$(basename "$file").err -o out/$(basename "$file").out --wrap='python3 loader.py $(basename "$file") ${MP_CHOOSER_FILE}'" >> annotation_jobs.bch + echo "sbatch --job-name=${JOBNAME} --mem=5G --time=2-00 \ +-e ../compressed_logs/annotation_logs/$(basename "$file").err -o ../compressed_logs/annotation_logs/$(basename "$file").out --wrap='python3 loader.py $(basename "$file") ${MP_CHOOSER_FILE}'" >> annotation_jobs.bch done chmod 775 annotation_jobs.bch @@ -253,13 +252,13 @@ python3.10 -m pip install rpy2 python3.10 -m pip install numpy python3.10 -m pip install pandas -message0 "Downloading the action script..." +message0 "Downloading the action script loader.py..." fetch_script loader.py annotation_pipeline -submit_limit_jobs annotation_jobs.bch ../../../../compressed_logs/annotation_job_id.txt +submit_limit_jobs annotation_jobs.bch ../compressed_logs/annotation_job_id.txt waitTillCommandFinish -message0 "Zipping logs..." -mv annotation_jobs.bch ../../../../compressed_logs -zip -q -rm ../../../../compressed_logs/annotation_logs.zip log/* err/* out/* -zip -q -rm splits.zip split_index_* +message0 "Running Slurm jobs to compress logs..." +mv annotation_jobs.bch ../compressed_logs +sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_annotations.txt --wrap="zip -r -m -q ../compressed_logs/annotation_logs.zip ../compressed_logs/annotation_logs/" +sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_splits.txt --wrap="find . -type f -name 'split_index_*' -exec zip -q -m splits.zip {} +" message0 "Job done."