Skip to content

Commit 56fa69f

Browse files
committed
Change pyspark path setting for production environment
1 parent bd4eddb commit 56fa69f

File tree

1 file changed

+41
-22
lines changed

1 file changed

+41
-22
lines changed

impc_etl/utils/spark.py

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -84,36 +84,48 @@ def wrapper():
8484
conf.set("spark.mongodb.read.connection.uri", mongo_url)
8585
conf.set("spark.mongodb.write.connection.uri", mongo_url)
8686

87-
# Add the impc_etl module to Spark workers using PYTHONPATH
88-
# Since all workers run on the same file system, we can use PYTHONPATH
87+
# Add the impc_etl module to Spark workers
8988
import impc_etl
89+
import zipfile
90+
import tempfile
9091

9192
impc_etl_path = os.path.dirname(impc_etl.__file__)
9293
parent_path = os.path.dirname(impc_etl_path)
9394
task_logger.info(
94-
f"Adding impc_etl module path to PYTHONPATH: {parent_path}"
95+
f"Preparing impc_etl module from: {parent_path}"
9596
)
9697

97-
# Get an existing PYTHONPATH if any and append our path
98-
existing_python_path = os.environ.get("PYTHONPATH", "")
99-
if existing_python_path:
100-
updated_python_path = f"{parent_path}:{existing_python_path}"
101-
else:
102-
updated_python_path = parent_path
103-
104-
# Set PYTHONPATH for executors through Spark configuration
105-
conf.set("spark.executorEnv.PYTHONPATH", updated_python_path)
106-
conf.set("spark.yarn.appMasterEnv.PYTHONPATH", updated_python_path)
107-
108-
# Also set it for the current environment to ensure consistency
109-
os.environ["PYTHONPATH"] = updated_python_path
98+
# Create a temporary zip file of the impc_etl module
99+
temp_zip_fd, temp_zip_path = tempfile.mkstemp(suffix='.zip', prefix='impc_etl_')
100+
os.close(temp_zip_fd) # Close the file descriptor
110101

111-
spark = SparkSession.builder.config(conf=conf).getOrCreate()
112-
task_logger.info(f"Adding impc_etl module path: {parent_path}")
113-
spark.sparkContext.addPyFile(parent_path)
114-
task_logger.info(
115-
f"Added impc_etl parent directory to Spark workers: {parent_path}"
116-
)
102+
try:
103+
# Create zip file containing all Python files
104+
with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
105+
for root, dirs, files in os.walk(impc_etl_path):
106+
# Skip __pycache__ directories
107+
dirs[:] = [d for d in dirs if d != '__pycache__']
108+
for file in files:
109+
if file.endswith('.py'):
110+
file_path = os.path.join(root, file)
111+
arcname = os.path.join('impc_etl', os.path.relpath(file_path, impc_etl_path))
112+
zipf.write(file_path, arcname)
113+
114+
task_logger.info(f"Created impc_etl zip: {temp_zip_path} (size: {os.path.getsize(temp_zip_path)} bytes)")
115+
116+
# Add zip to Spark configuration before creating session
117+
conf.set("spark.submit.pyFiles", temp_zip_path)
118+
119+
spark = SparkSession.builder.config(conf=conf).getOrCreate()
120+
121+
# Also add via sparkContext for runtime distribution
122+
spark.sparkContext.addPyFile(temp_zip_path)
123+
task_logger.info(f"Added impc_etl module to Spark workers")
124+
except Exception as e:
125+
task_logger.error(f"Failed to prepare impc_etl module: {e}")
126+
if os.path.exists(temp_zip_path):
127+
os.unlink(temp_zip_path)
128+
raise
117129

118130
spark_logger = logging.getLogger("spark")
119131
spark_logger.setLevel(logging.WARNING)
@@ -133,6 +145,13 @@ def wrapper():
133145
task_logger.warning(f"Error stopping Spark session: {e}")
134146
# Continue execution as the task may have completed successfully
135147
finally:
148+
# Clean up temporary zip file
149+
if 'temp_zip_path' in locals() and os.path.exists(temp_zip_path):
150+
try:
151+
os.unlink(temp_zip_path)
152+
task_logger.info("Cleaned up temporary impc_etl zip file")
153+
except Exception as e:
154+
task_logger.warning(f"Failed to clean up zip file: {e}")
136155
task_logger.info("::endgroup::")
137156

138157
return wrapper

0 commit comments

Comments
 (0)