Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
27c0349
feat: Airflow task to generate chromosome-map.json
francisco-ebi Oct 15, 2025
fbcfe0f
feat: initial task to generate chromosome dirs and gene summaries
francisco-ebi Oct 15, 2025
469a877
Merge branch 'dev' into feature/impc-spa
francisco-ebi Oct 21, 2025
80cd700
feat: initial task to generate external-links files based on chromoso…
francisco-ebi Oct 23, 2025
5d28c4d
feat: rename functions
francisco-ebi Oct 23, 2025
0d6236e
chore: add init file to impc_spa dir
francisco-ebi Oct 23, 2025
d25f769
fix: impc_gene_external_links_mapper, repartition by mgi ID to ensure…
francisco-ebi Oct 27, 2025
fb7d9be
feat: generalize functions to partition data and create files
francisco-ebi Oct 28, 2025
0027298
feat: update gene external links task to use new functions
francisco-ebi Oct 28, 2025
9e1bb2a
Merge branch 'dev' into feature/impc-spa
francisco-ebi Oct 28, 2025
ea27b64
fix: add missing param, update gene external links mapper task
francisco-ebi Oct 28, 2025
e99bf96
WIP
francisco-ebi Oct 24, 2025
36f8204
feat: initial Airflow task to generate gene publications.json files
francisco-ebi Oct 27, 2025
a852a0e
feat: update gene publications mapper to use utils function
francisco-ebi Oct 28, 2025
695fa2d
WIP
francisco-ebi Oct 24, 2025
66eaf3c
feat: initial task to create gene-histopathology json files
francisco-ebi Oct 24, 2025
3a775f3
fix: ensure one file per gene
francisco-ebi Oct 27, 2025
1a8577e
fix: remove diseases mapper from this branch
francisco-ebi Oct 28, 2025
9c38c39
feat: update task to use utils function
francisco-ebi Oct 28, 2025
89626d8
feat: initial task to create gene images.json file
francisco-ebi Oct 27, 2025
1b7eeff
feat: update task with utils functions
francisco-ebi Oct 28, 2025
857ec79
fix: add filename
francisco-ebi Oct 28, 2025
76da083
feat: initial Airflow task to generate gene expression json files
francisco-ebi Oct 27, 2025
26581f4
feat: update task to use utils functions
francisco-ebi Oct 28, 2025
a46bf96
fix: add file name
francisco-ebi Oct 28, 2025
bc3ff63
feat: initial Airflow task to produce gene phenotypehits json files
francisco-ebi Oct 27, 2025
242725f
feat: update task to use utils functions
francisco-ebi Oct 28, 2025
bfb9d66
fix: add file name
francisco-ebi Oct 28, 2025
4e03918
Merge branch 'feature/impc-spa-gene-histopathology-mapper' into featu…
francisco-ebi Oct 28, 2025
05c59fb
Merge branch 'feature/impc-gene-images-mapper' into feature/impc-spa
francisco-ebi Oct 28, 2025
84281c1
Merge branch 'feature/impc-spa-gene-expression' into feature/impc-spa
francisco-ebi Oct 28, 2025
9840217
Merge branch 'feature/impc-spa-gene-phenotype-hits-mapper' into featu…
francisco-ebi Oct 28, 2025
f987c8a
feat: initial task that generates stats-results json files
francisco-ebi Oct 28, 2025
e1f9657
feat: improve write_partitioned_data function, add argument to mutate…
francisco-ebi Oct 29, 2025
01490be
feat: update gene publications mapper, use improved utils function
francisco-ebi Oct 29, 2025
326d166
feat: added task to generate phenotype summaries
francisco-ebi Oct 30, 2025
381e404
feat: added Airflow task to generate phenotype genotypehits json file
francisco-ebi Oct 30, 2025
682ebc4
chore: move imports outside functions
francisco-ebi Nov 4, 2025
3c9fa94
fix: use another variable name to store new file path
francisco-ebi Nov 4, 2025
5975309
feat: added Airflow task to generate order.json files
francisco-ebi Nov 4, 2025
e0ac173
Merge branch 'dev' into feature/impc-spa
francisco-ebi Nov 4, 2025
c2e7f48
feat: added Airflow task to generate allele mice json file
francisco-ebi Nov 6, 2025
57ec2a1
fix: gene allele mice mapper, add chromosome map asset to schedule list
francisco-ebi Nov 7, 2025
0ff8796
WIP: gene allele Es Cells mapper
francisco-ebi Nov 7, 2025
1661b56
WIP: gene allele tvp mapper
francisco-ebi Nov 7, 2025
8300071
Merge remote-tracking branch 'origin/dev' into feature/impc-spa
francisco-ebi Nov 10, 2025
f5e55dc
fix: set correct filename in gene allele tvp mapper
francisco-ebi Nov 10, 2025
64a7184
feat: gene allele ivp mapper task
francisco-ebi Nov 10, 2025
5809eff
feat: add gentar crispr report asset to data_ingestion
francisco-ebi Nov 10, 2025
e061a1b
feat: added gene allele crispr mapper Airflow task
francisco-ebi Nov 10, 2025
d0b148e
feat: gentar crispr report loader moved into a function in impc_spa m…
francisco-ebi Nov 10, 2025
0d3e305
feat: added histopathology datasets mapper Airflow task
francisco-ebi Nov 13, 2025
93cc684
Merge branch 'dev' into feature/impc-spa
francisco-ebi Nov 14, 2025
7c06405
feat: update Allele tasks to write files in correct location
francisco-ebi Nov 19, 2025
341d868
WIP: airflow task to generate allele summary json files
francisco-ebi Nov 20, 2025
e1ff050
feat: airflow task to generate image dir structure and control/mutant…
francisco-ebi Nov 20, 2025
7bf6bce
feat: add Airflow task for gene significant phenotypes
francisco-ebi Nov 24, 2025
dd85001
WIP: all phenotype data mapper task
francisco-ebi Dec 4, 2025
623890c
feat: initial all phenotype data mapper task
francisco-ebi Dec 4, 2025
c5dfbbb
Merge branch 'feature/impc-spa-diseases-mapper' into feature/impc-spa
francisco-ebi Dec 5, 2025
37e0630
feat: diseases mapper task
francisco-ebi Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions impc_etl/jobs/ingest/data_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,32 @@ def copy_products_report():

return tracking_products_mice_file_asset

tracking_products_crispr_file_asset = create_input_asset(
"tracking/gentar-products_crispr-latest.json"
)

@asset.multi(
schedule=[tracking_directory_asset],
outlets=[tracking_products_crispr_file_asset],
dag_id=f"{dr_tag}_copy_products_crispr_report",
)
def copy_products_report():
"""Gather tracking data from GenTar when tracking directory is available"""
source_file = f"{data_archive_path}/gentar-data-archive/product_reports/gentar-products_crispr-latest.json"
target_file = f"{input_data_path}/tracking/gentar-products_crispr.json"

task_logger.info(f"Copying tracking data from {source_file} to {target_file}")

# Ensure source file exists
if not os.path.exists(source_file):
raise FileNotFoundError(f"Source tracking file not found: {source_file}")

# Copy the file
shutil.copy(source_file, target_file)
task_logger.info(f"Successfully copied tracking data to {target_file}")

return tracking_products_mice_file_asset


gene_interest_asset = create_input_asset("tracking/gene_interest.tsv")
gene_interest_json_asset = create_input_asset("tracking/gene_interest.json")
Expand Down
Empty file.
36 changes: 36 additions & 0 deletions impc_etl/jobs/load/impc_spa/impc_chromosome_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging
import textwrap
from airflow.sdk import Variable, asset
from impc_etl.utils.airflow import create_input_asset, create_output_asset
from impc_etl.utils.spark import with_spark_session

task_logger = logging.getLogger("airflow.task")
dr_tag = Variable.get("data_release_tag")

gene_summary_service_json_asset = create_input_asset("output/impc_web_api/gene_summary_service_json")
chromosome_map_json_asset = create_output_asset("impc_spa/chromosome-map.json")

@asset.multi(
schedule=[gene_summary_service_json_asset],
outlets=[chromosome_map_json_asset],
dag_id=f"{dr_tag}_impc_spa_chromosome_mapper",
description=textwrap.dedent(
"""IMPC SPA chromosome mapper DAG."""
),
tags=["impc_spa", "chromosome map"],
)
@with_spark_session
def impc_spa_chromosome_mapper():
import json
from pyspark.sql import SparkSession
from urllib.parse import unquote, urlparse

spark = SparkSession.builder.getOrCreate()
gene_summary_service_json_path = gene_summary_service_json_asset.uri
gene_summary_df = spark.read.json(gene_summary_service_json_path)
gene_summary_df = gene_summary_df.select("mgiGeneAccessionId", "chrName")
gene_list = map(lambda row: row.asDict(), gene_summary_df.collect())
chromosome_map_dict = {gene["mgiGeneAccessionId"]: gene["chrName"] for gene in gene_list}
output_path = unquote(urlparse(chromosome_map_json_asset.uri).path)
with open(output_path, "w") as output_file:
output_file.write(json.dumps(chromosome_map_dict))
89 changes: 89 additions & 0 deletions impc_etl/jobs/load/impc_spa/impc_diseases_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
import logging
import textwrap
from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset
from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir
from impc_etl.utils.impc_spa import generate_valid_json_from_file
from impc_etl.utils.spark import with_spark_session

task_logger = logging.getLogger("airflow.task")
dr_tag = Variable.get("data_release_tag")

gene_diseases_service_json_asset = create_input_asset("output/impc_web_api/gene_diseases_service_json")
chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json")
gene_diseases_asset = AssetAlias("impc_spa_diseases")

@dag(
schedule=[gene_diseases_service_json_asset, chromosome_map_json_asset],
dag_id=f"{dr_tag}_impc_spa_diseases_mapper",
description=textwrap.dedent(
"""IMPC SPA diseases mapper DAG."""
),
tags=["impc_spa", "diseases"],
)
def impc_spa_gene_diseases_mapper():
@with_spark_session
@task
def process_gene_diseases():
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace
from urllib.parse import unquote, urlparse

spark = SparkSession.builder.getOrCreate()

chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path)
chromosome_map_json = json.loads(open(chromosome_map_json_path).read())
chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"])
gene_diseases_service_json_path = gene_diseases_service_json_asset.uri
gene_diseases_df = spark.read.json(gene_diseases_service_json_path)
result_df = chromosome_map_df.join(gene_diseases_df, "mgiGeneAccessionId", "left_outer")
result_df.show()
result_df = result_df.filter(col("associationCurated").isNotNull())
result_df = result_df.drop("chromosome")
result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_"))
(result_df
.repartition("mgiGeneAccessionId")
.write
.option("header", True)
.mode("overwrite")
.partitionBy("mgiGeneAccessionId", "associationCurated")
.json(f"{get_data_release_work_dir()}/output/impc_spa/gene_diseases_temp_json")
)
print("Finished")

@task(outlets=[gene_diseases_asset])
def process_temp_folder(*, outlet_events):
import os
import shutil
from glob import iglob
from urllib.parse import unquote, urlparse

chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path)
chromosome_map_json = json.loads(open(chromosome_map_json_path).read())

input_path = f"{get_data_release_work_dir()}/output/impc_spa/gene_diseases_temp_json"
output_path = f"{get_data_release_work_dir()}/output/impc_spa"
for file_path in iglob(f"{input_path}/**/*.json", recursive=True):
filepath_parts = file_path.split("/")
filepath_parts.pop()
parent_dir = filepath_parts.pop()
association_status = parent_dir.split("=")[1]
parent_dir = filepath_parts.pop()
mgi_gene_accession_id = parent_dir.split("=")[1]
original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":")

chromosome = chromosome_map_json[original_mgi_gene_accession_id]
chromosome_folder = f"{output_path}/{chromosome}"
os.makedirs(chromosome_folder, exist_ok=True)

gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}"
os.makedirs(gene_dir_path, exist_ok=True)
file_name = 'associated-diseases' if association_status == "true" else 'predicted-diseases'
gene_external_links_path = f"{gene_dir_path}/{file_name}.json"
with open(gene_external_links_path, "w") as gene_file:
gene_file.write(generate_valid_json_from_file(file_path))
shutil.rmtree(input_path)
print("Finished")

chain(process_gene_diseases(), process_temp_folder())
impc_spa_gene_diseases_mapper()
96 changes: 96 additions & 0 deletions impc_etl/jobs/load/impc_spa/impc_gene_all_phenotype_data_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import logging
import textwrap
from airflow.sdk import Variable, task, AssetAlias, chain, dag, Asset
from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir
from impc_etl.utils.impc_spa import write_partitioned_data, generate_valid_json_from_file
from impc_etl.utils.spark import with_spark_session

task_logger = logging.getLogger("airflow.task")
dr_tag = Variable.get("data_release_tag")

datasets_metadata_service_json_asset = create_input_asset("output/impc_web_api/datasets_metadata_service_json")
chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json")
gene_all_phenotype_data_asset = AssetAlias("impc_spa_gene_all_phenotype_data")

@dag(
schedule=[datasets_metadata_service_json_asset, chromosome_map_json_asset],
dag_id=f"{dr_tag}_impc_spa_all_phenotype_data_mapper",
description=textwrap.dedent(
"""IMPC SPA gene all phenotype data mapper DAG."""
),
tags=["impc_spa", "gene", "all phenotype data"],
)
def impc_spa_gene_all_phenotype_data_mapper():
@with_spark_session
@task
def process_parquet():
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace
from urllib.parse import unquote, urlparse
import json

spark = SparkSession.builder.getOrCreate()

chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path)
chromosome_map_json = json.loads(open(chromosome_map_json_path).read())
chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"])

dataset_df = spark.read.json(datasets_metadata_service_json_asset.uri)
result_df = chromosome_map_df.join(dataset_df, "mgiGeneAccessionId", "left_outer")
result_df = result_df.filter(col("pipelineStableId").isNotNull())
result_df = result_df.drop("chromosome")
result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_"))

(result_df
.repartition("mgiGeneAccessionId")
.write
.mode("overwrite")
.partitionBy("mgiGeneAccessionId", "pipelineStableId", "procedureStableId")
.json(f"{get_data_release_work_dir()}/output/impc_spa/all_ph_data_temp_json")
)

print("Finished")

@task(outlets=[gene_all_phenotype_data_asset])
def process_temp_folder(*, outlet_events):
import json
import os
import shutil
from glob import iglob
from urllib.parse import unquote, urlparse

chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path)
chromosome_map_json = json.loads(open(chromosome_map_json_path).read())

input_path = f"{get_data_release_work_dir()}/output/impc_spa/all_ph_data_temp_json"
output_path = f"{get_data_release_work_dir()}/output/impc_spa"
for file_path in iglob(f"{input_path}/**/*.json", recursive=True):
filepath_parts = file_path.split("/")
filepath_parts.pop()
parent_dir = filepath_parts.pop()
procedure_stable_id = parent_dir.split("=")[1]
parent_dir = filepath_parts.pop()
pipeline_stable_id = parent_dir.split("=")[1]
parent_dir = filepath_parts.pop()
mgi_gene_accession_id = parent_dir.split("=")[1]
original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":")

chromosome = chromosome_map_json[original_mgi_gene_accession_id]
chromosome_folder = f"{output_path}/{chromosome}"
os.makedirs(chromosome_folder, exist_ok=True)

gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}"
os.makedirs(gene_dir_path, exist_ok=True)
general_gene_images_path = f"{gene_dir_path}/pipeline"
os.makedirs(general_gene_images_path, exist_ok=True)
pipeline_dir_path = f"{general_gene_images_path}/{pipeline_stable_id}"
os.makedirs(pipeline_dir_path, exist_ok=True)
# write data
file_to_be_generated_path = f"{pipeline_dir_path}/{procedure_stable_id}.json"
with open(file_to_be_generated_path, "w") as dataset_file:
dataset_file.write(generate_valid_json_from_file(file_path))
shutil.rmtree(input_path)
print("Finished")

chain(process_parquet(), process_temp_folder())
impc_spa_gene_all_phenotype_data_mapper()
102 changes: 102 additions & 0 deletions impc_etl/jobs/load/impc_spa/impc_gene_allele_crispr_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import logging
import textwrap
from airflow.sdk import Variable, task, dag, chain

from impc_etl.utils.airflow import create_input_asset, get_data_release_work_dir
from impc_etl.utils.impc_spa import create_gentar_crispr_report_df
from impc_etl.utils.spark import with_spark_session

task_logger = logging.getLogger("airflow.task")
dr_tag = Variable.get("data_release_tag")

gentar_products_crispr_latest_json_output_asset = create_input_asset("tracking/gentar-products_crispr.json")
chromosome_map_json_asset = create_input_asset("output/impc_spa/chromosome-map.json")

@dag(
schedule=[gentar_products_crispr_latest_json_output_asset, chromosome_map_json_asset],
dag_id=f"{dr_tag}_impc_spa_gene_allele_crispr_mapper",
description=textwrap.dedent(
"""IMPC SPA gene allele crispr mapper DAG."""
),
tags=["impc_spa", "allele", "crispr"],
)
def impc_spa_gene_allele_crispr_mapper():
@with_spark_session
@task
def process_allele_crispr_data():
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace
from urllib.parse import unquote, urlparse

spark = SparkSession.builder.getOrCreate()

chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path)
chromosome_map_json = json.loads(open(chromosome_map_json_path).read())
chromosome_map_df = spark.createDataFrame(chromosome_map_json.items(), ["mgiGeneAccessionId", "chromosome"])

product_crispr_df = create_gentar_crispr_report_df(spark, gentar_products_crispr_latest_json_output_asset.uri)

result_df = chromosome_map_df.join(product_crispr_df, "mgiGeneAccessionId", "left_outer")
result_df = result_df.filter(col("alleleSuperscript").isNotNull())
result_df = result_df.drop("chromosome")
result_df = result_df.withColumn("mgiGeneAccessionId", regexp_replace("mgiGeneAccessionId", ":", "_"))
(result_df
.repartition("mgiGeneAccessionId")
.write
.mode("overwrite")
.partitionBy("mgiGeneAccessionId")
.json(f"{get_data_release_work_dir()}/output/impc_spa/allele_crispr_temp_json")
)

print("Finished")
@task
def process_temp_folder():
import json
import os
import shutil
from glob import iglob
from urllib.parse import unquote, urlparse

chromosome_map_json_path = unquote(urlparse(chromosome_map_json_asset.uri).path)
chromosome_map_json = json.loads(open(chromosome_map_json_path).read())

input_path = f"{get_data_release_work_dir()}/output/impc_spa/allele_crispr_temp_json"
output_path = f"{get_data_release_work_dir()}/output/impc_spa"
for file_path in iglob(f"{input_path}/**/*.json"):
filepath_parts = file_path.split("/")
filepath_parts.pop()
parent_dir = filepath_parts.pop()
mgi_gene_accession_id = parent_dir.split("=")[1]
original_mgi_gene_accession_id = mgi_gene_accession_id.replace("_", ":")
chromosome = chromosome_map_json[original_mgi_gene_accession_id]
chromosome_folder = f"{output_path}/{chromosome}"
os.makedirs(chromosome_folder, exist_ok=True)
gene_dir_path = f"{chromosome_folder}/{mgi_gene_accession_id}"
os.makedirs(gene_dir_path, exist_ok=True)

allele_dict = {}
# JSON file can have multiple objects for distinct alleles
allele_data = open(file_path, 'r')
for json_obj_str in allele_data.readlines():
allele_obj = json.loads(json_obj_str)
if allele_obj["alleleSuperscript"] in allele_dict:
allele_dict[allele_obj["alleleSuperscript"]].append(json_obj_str)
else:
allele_dict[allele_obj["alleleSuperscript"]] = [json_obj_str]


general_alleles_dir_path = f"{gene_dir_path}/alleles"
os.makedirs(general_alleles_dir_path, exist_ok=True)
for original_allele_name, allele_json_list in allele_dict.items():
allele_name = original_allele_name.replace("/", "_")
allele_dir_path = f"{general_alleles_dir_path}/{allele_name}"
os.makedirs(allele_dir_path, exist_ok=True)
file_to_be_generated_path = f"{allele_dir_path}/crispr.json"
with open(file_to_be_generated_path, "w") as allele_file:
allele_file.write(f"[{','.join(allele_json_list)}]")
shutil.rmtree(input_path)
print("Finished")

chain(process_allele_crispr_data(), process_temp_folder())
impc_spa_gene_allele_crispr_mapper()
Loading