Skip to content

Commit 340bd14

Browse files
authored
Merge pull request #97 from mpi2/orchestration
Move orchestration from inside the pipeline to an external layer
2 parents adb1ff2 + 3ada56e commit 340bd14

File tree

8 files changed

+359
-729
lines changed

8 files changed

+359
-729
lines changed

IMPC annotation pipeline/loader.R

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
args <- commandArgs(trailingOnly = TRUE)
2+
if (length(args) < 2) {
3+
stop("Two arguments are required: the file path and the mp_chooser_file.")
4+
}
5+
file <- args[1]
6+
mp_chooser_file <- args[2]
7+
8+
# Load necessary libraries
9+
library(data.table)
10+
library(jsonlite)
11+
library(rlist)
12+
library(Tmisc)
13+
library(rwebhdfs)
14+
15+
# Set levels
16+
level <- .0001
17+
rrlevel <- .0001
18+
19+
# Start annotation pipeline
20+
today <- format(Sys.time(), "%d%m%Y")
21+
flist <- readLines(con = file)
22+
lflist <- length(flist)
23+
24+
# Store StatPackets temporary
25+
if (!dir.exists("tmp")) {
26+
dir.create("tmp")
27+
}
28+
29+
tmplocalfile <- file.path('tmp', paste0(basename(file), '_.statpackets'))
30+
31+
32+
statpackets_out = NULL
33+
for (i in 1:lflist) {
34+
cat('\r', i, '/', lflist)
35+
file = flist[i]
36+
cat("\n", i, "/", lflist, "~>", file, "")
37+
if (file.exists(file) &&
38+
(grepl(pattern = 'NotProcessed', x = file) ||
39+
grepl(pattern = 'Successful' , x = file))) {
40+
df = data.table::fread(
41+
file = file,
42+
header = FALSE,
43+
sep = '\t',
44+
quote = "",
45+
stringsAsFactors = FALSE
46+
)
47+
if (ncol(df) != 20 ||
48+
nrow(df) > 1) {
49+
message('file ignored (!=20 columns): ', file)
50+
next
51+
}
52+
###################
53+
rN = DRrequiredAgeing:::annotationChooser(
54+
statpacket = df,
55+
level = level,
56+
rrlevel = rrlevel,
57+
mp_chooser_file = mp_chooser_file
58+
)
59+
rW = DRrequiredAgeing:::annotationChooser(
60+
statpacket = rN$statpacket,
61+
level = level,
62+
rrlevel = rrlevel,
63+
resultKey = 'Windowed result',
64+
TermKey = 'WMPTERM',
65+
mp_chooser_file = mp_chooser_file
66+
)
67+
68+
write(paste0(as.character(rW$statpacket$V20), collapse = ''),
69+
file = tmplocalfile,
70+
append = TRUE)
71+
#statpackets_out = c(statpackets_out, rW$statpacket)
72+
#writeLines(statpackets_out, con = tmplocalfile)
73+
}
74+
}
75+
76+
# statpackets need to be stored as characters
77+
# statpackets_out = as.character(statpackets_out)
78+
gc()

IMPC annotation pipeline/loaderHadoop.R

Lines changed: 0 additions & 118 deletions
This file was deleted.

0 commit comments

Comments
 (0)