Skip to content

Commit 8f94f9f

Browse files
committed
Merge remote-tracking branch 'origin/dev' into feature/impc-spa
2 parents 93cc684 + cae0cc7 commit 8f94f9f

File tree

2 files changed

+54
-24
lines changed

2 files changed

+54
-24
lines changed

impc_etl/jobs/ingest/data_ingestion.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ def fetch_dcc_xml_data(context):
314314

315315
# Define the output file path
316316
zip_file_path = f"{data_archive_path}/{dr_tag}/{dcc_xml_zip_file_name}"
317-
extract_path = f"{data_archive_path}/{dr_tag}/dcc_xml_extracted"
317+
extract_path = f"{data_archive_path}/{dr_tag}/dcc_xml_extracted/"
318318
impc_xml_output_path = f"{input_data_path}/xml/impc/"
319319

320320
try:
@@ -358,10 +358,11 @@ def fetch_dcc_xml_data(context):
358358
task_logger.info(f"File extracted successfully: {extract_path}")
359359

360360
# Copy extracted data to final output directory skipping the top-level directory 'extracted'
361-
for item in os.listdir(extract_path):
361+
data_path = f"{extract_path}/data/"
362+
for item in os.listdir(data_path):
362363
if item.startswith("._"):
363364
continue # Skip macOS dot files
364-
s = os.path.join(extract_path, item)
365+
s = os.path.join(data_path, item)
365366
d = os.path.join(impc_xml_output_path, item)
366367
if os.path.isdir(s):
367368
shutil.copytree(s, d, dirs_exist_ok=True)

impc_etl/utils/spark.py

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import os
3+
import sys
34
from functools import wraps
45

56
from airflow.hooks.base import BaseHook
@@ -26,7 +27,9 @@ def wrapper():
2627
spark_master = spark_connection.get("spark_master", "localhost")
2728
environment = Variable.get("environment", "development")
2829
task_logger.info(f"Spark connection URL: {spark_connection}")
29-
task_logger.info("::group::SPARK LOGS")
30+
task_logger.info(f"Initializing Spark session for function: {func.__name__} on {environment} environment")
31+
task_logger.info(f"Driver Python executable: {sys.executable}")
32+
task_logger.info(f"Driver Python version: {sys.version}")
3033

3134
conf = (
3235
SparkConf()
@@ -40,15 +43,15 @@ def wrapper():
4043
"spark.driver.extraJavaOptions",
4144
"-Dlog4j.configuration=file:/opt/airflow/log4j.properties",
4245
)
43-
.set("spark.pyspark.python", ".")
46+
4447
)
4548

4649
if environment == "development":
4750
conf.set("spark.driver.memory", "2g").set(
4851
"spark.executor.memory", "3g"
4952
).set("spark.driver.maxResultSize", "2g").set(
5053
"spark.executor.cores", "2"
51-
)
54+
).set("spark.pyspark.python", ".")
5255
else:
5356
# Production configuration matching Luigi settings
5457
conf.set("spark.driver.memory", "30g").set(
@@ -65,7 +68,7 @@ def wrapper():
6568
"spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED"
6669
).set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED").set(
6770
"spark.sql.session.timeZone", "UTC"
68-
)
71+
).set("spark.pyspark.python", sys.executable).set("spark.pyspark.driver.python", sys.executable)
6972

7073
jars_packages = []
7174
if postgres_database:
@@ -81,37 +84,56 @@ def wrapper():
8184
conf.set("spark.mongodb.read.connection.uri", mongo_url)
8285
conf.set("spark.mongodb.write.connection.uri", mongo_url)
8386

84-
# Add the impc_etl module to Spark workers using PYTHONPATH
85-
# Since all workers run on the same file system, we can use PYTHONPATH
87+
# Add the impc_etl module to Spark workers
8688
import impc_etl
89+
import zipfile
90+
import tempfile
8791

8892
impc_etl_path = os.path.dirname(impc_etl.__file__)
8993
parent_path = os.path.dirname(impc_etl_path)
9094
task_logger.info(
91-
f"Adding impc_etl module path to PYTHONPATH: {parent_path}"
95+
f"Preparing impc_etl module from: {parent_path}"
9296
)
9397

94-
# Get an existing PYTHONPATH if any and append our path
95-
existing_python_path = os.environ.get("PYTHONPATH", "")
96-
if existing_python_path:
97-
updated_python_path = f"{parent_path}:{existing_python_path}"
98-
else:
99-
updated_python_path = parent_path
98+
# Create a temporary zip file of the impc_etl module
99+
temp_zip_path = None
100+
temp_zip_fd, temp_zip_path = tempfile.mkstemp(suffix='.zip', prefix='impc_etl_')
101+
os.close(temp_zip_fd) # Close the file descriptor
100102

101-
# Set PYTHONPATH for executors through Spark configuration
102-
conf.set("spark.executorEnv.PYTHONPATH", updated_python_path)
103-
conf.set("spark.yarn.appMasterEnv.PYTHONPATH", updated_python_path)
104-
105-
# Also set it for the current environment to ensure consistency
106-
os.environ["PYTHONPATH"] = updated_python_path
107-
108-
spark = SparkSession.builder.config(conf=conf).getOrCreate()
103+
try:
104+
# Create zip file containing all Python files
105+
with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
106+
for root, dirs, files in os.walk(impc_etl_path):
107+
# Skip __pycache__ directories
108+
dirs[:] = [d for d in dirs if d != '__pycache__']
109+
for file in files:
110+
if file.endswith('.py'):
111+
file_path = os.path.join(root, file)
112+
arcname = os.path.join('impc_etl', os.path.relpath(file_path, impc_etl_path))
113+
zipf.write(file_path, arcname)
114+
115+
task_logger.info(f"Created impc_etl zip: {temp_zip_path} (size: {os.path.getsize(temp_zip_path)} bytes)")
116+
117+
# Add zip to Spark configuration before creating session
118+
conf.set("spark.submit.pyFiles", temp_zip_path)
119+
120+
spark = SparkSession.builder.config(conf=conf).getOrCreate()
121+
122+
# Also add via sparkContext for runtime distribution
123+
spark.sparkContext.addPyFile(temp_zip_path)
124+
task_logger.info(f"Added impc_etl module to Spark workers")
125+
except Exception as e:
126+
task_logger.error(f"Failed to prepare impc_etl module: {e}")
127+
if temp_zip_path and os.path.exists(temp_zip_path):
128+
os.unlink(temp_zip_path)
129+
raise
109130

110131
spark_logger = logging.getLogger("spark")
111132
spark_logger.setLevel(logging.WARNING)
112133
spark_logger.handlers = task_logger.handlers
113134
spark.sparkContext.setLogLevel("WARN")
114135

136+
task_logger.info("::group::SPARK LOGS")
115137
try:
116138
result = func()
117139
return result
@@ -124,6 +146,13 @@ def wrapper():
124146
task_logger.warning(f"Error stopping Spark session: {e}")
125147
# Continue execution as the task may have completed successfully
126148
finally:
149+
# Clean up temporary zip file
150+
if temp_zip_path and os.path.exists(temp_zip_path):
151+
try:
152+
os.unlink(temp_zip_path)
153+
task_logger.info("Cleaned up temporary impc_etl zip file")
154+
except Exception as e:
155+
task_logger.warning(f"Failed to clean up zip file: {e}")
127156
task_logger.info("::endgroup::")
128157

129158
return wrapper

0 commit comments

Comments
 (0)