PySpark predict_batch_udf causes Cuda oom in kubeflow pipeline, but not kubeflow jupyter

I’ve spent over 20 hours on this problem. I use a SentenceTransformer model to embed ~3 million text documents and write to opensearch. I’m using PySpark’s predict_batch_udf, and running with Kubeflow Pipeline. pytorch_model.bin is ~500MB, so probably <1G mem needed to load.

Basic code structure:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>def main():
spark = pyspark.sql.SparkSession.builder.appName('xxx')
.config("spark.hadoop.fs.s3a.endpoint", "http://xx")
.config("spark.executor.instances", os.environ['exec_inst'])
.config("spark.executor.cores", os.environ['exec_core'])
.......
.getOrCreate() # actually written in another SparkUtil object, but same content
df = spark.read.format("delta").load(db_name)
model = SentenceTransformer(os.environ['ranking_model_dir']).eval()
model_bc = spark.sparkContext.broadcast(ranking_model)
def make_predict_fn():
def predict(inputs: str):
with torch.no_grad():
with torch.cuda.amp.autocast():
soln_embed = model_bc.value.encode(inputs, normalize_embeddings=True)
torch.cuda.empty_cache()
return soln_embed
return predict
# create standard PandasUDF from predict function
embed = predict_batch_udf(make_predict_fn,
return_type=ArrayType(FloatType()),
batch_size=8) # smallest bs to use tensor core 4*32=128
df = soln_df.withColumn("embeddings", embed('text_to_embed')).drop('text_to_embed')
spark_util.write_to_opensearch(df, os_index, 'my_id') # using the same SparkUtil obj
</code>
<code>def main(): spark = pyspark.sql.SparkSession.builder.appName('xxx') .config("spark.hadoop.fs.s3a.endpoint", "http://xx") .config("spark.executor.instances", os.environ['exec_inst']) .config("spark.executor.cores", os.environ['exec_core']) ....... .getOrCreate() # actually written in another SparkUtil object, but same content df = spark.read.format("delta").load(db_name) model = SentenceTransformer(os.environ['ranking_model_dir']).eval() model_bc = spark.sparkContext.broadcast(ranking_model) def make_predict_fn(): def predict(inputs: str): with torch.no_grad(): with torch.cuda.amp.autocast(): soln_embed = model_bc.value.encode(inputs, normalize_embeddings=True) torch.cuda.empty_cache() return soln_embed return predict # create standard PandasUDF from predict function embed = predict_batch_udf(make_predict_fn, return_type=ArrayType(FloatType()), batch_size=8) # smallest bs to use tensor core 4*32=128 df = soln_df.withColumn("embeddings", embed('text_to_embed')).drop('text_to_embed') spark_util.write_to_opensearch(df, os_index, 'my_id') # using the same SparkUtil obj </code>
def main():
    spark = pyspark.sql.SparkSession.builder.appName('xxx')
                   .config("spark.hadoop.fs.s3a.endpoint", "http://xx")
                   .config("spark.executor.instances", os.environ['exec_inst'])
                   .config("spark.executor.cores", os.environ['exec_core'])
                   .......
                   .getOrCreate() # actually written in another SparkUtil object, but same content
    df = spark.read.format("delta").load(db_name)
    model = SentenceTransformer(os.environ['ranking_model_dir']).eval()
    model_bc = spark.sparkContext.broadcast(ranking_model)
    def make_predict_fn():
        def predict(inputs: str):
            with torch.no_grad():
                with torch.cuda.amp.autocast():
                    soln_embed = model_bc.value.encode(inputs, normalize_embeddings=True)
                    torch.cuda.empty_cache()
                    return soln_embed
        return predict
    # create standard PandasUDF from predict function
    embed = predict_batch_udf(make_predict_fn,
                              return_type=ArrayType(FloatType()),
                              batch_size=8) # smallest bs to use tensor core 4*32=128
    df = soln_df.withColumn("embeddings", embed('text_to_embed')).drop('text_to_embed')
    spark_util.write_to_opensearch(df, os_index, 'my_id') # using the same SparkUtil obj

Now, I’ve practically played with all layouts of this code. I’ve moved the function definition/model loading/broadcast vs no broadcast in and out the main function.

The weird part is: the exact same code, when I ran it on kubeflow jupyter notebook, it just works. The cell runs for an hour or two then finishes, Cuda memory goes up to ~15G, my GPU has 16G VRAM. But when I put it in python script and run in a kubeflow job pod, it would barely last 2 minutes before crashing. I even tried making the main() async and run in eventloop to simulate jupyter notebook, but no help.

From what I saw, it’s just two distinct types of error.
With broadcast:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>GPU info, spark info run config stuff
/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:224: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead
24/06/29 06:56:27 ERROR Executor: Exception in task 17.0 in stage 5.0 (TID 80)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/pyspark/ml/functions.py", line 806, in predict
preds = predict_fn(single_input)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/app/process_rerank_embd_gpu.py", line 46, in predict
soln_embed = model_bc.value.encode(inputs, normalize_embeddings=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
......
[bunch of propagation of errors, pytorch this calls that stuff]
......
File "/usr/local/lib/python3.11/site-packages/torch/nn/functional.py", line 2237, in embedding
return torch.embedding(weight, input, padding_idx, scale_grad_by_freq, sparse)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
</code>
<code>GPU info, spark info run config stuff /usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:224: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead 24/06/29 06:56:27 ERROR Executor: Exception in task 17.0 in stage 5.0 (TID 80) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/pyspark/ml/functions.py", line 806, in predict preds = predict_fn(single_input) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/app/process_rerank_embd_gpu.py", line 46, in predict soln_embed = model_bc.value.encode(inputs, normalize_embeddings=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...... [bunch of propagation of errors, pytorch this calls that stuff] ...... File "/usr/local/lib/python3.11/site-packages/torch/nn/functional.py", line 2237, in embedding return torch.embedding(weight, input, padding_idx, scale_grad_by_freq, sparse) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ RuntimeError: CUDA error: out of memory CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect. For debugging consider passing CUDA_LAUNCH_BLOCKING=1. Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions. </code>
GPU info, spark info run config stuff
/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:224: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead
24/06/29 06:56:27 ERROR Executor: Exception in task 17.0 in stage 5.0 (TID 80)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/pyspark/ml/functions.py", line 806, in predict
    preds = predict_fn(single_input)
            ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/app/process_rerank_embd_gpu.py", line 46, in predict
    soln_embed = model_bc.value.encode(inputs, normalize_embeddings=True)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   ......
   [bunch of propagation of errors, pytorch this calls that stuff]
   ......
  File "/usr/local/lib/python3.11/site-packages/torch/nn/functional.py", line 2237, in embedding
    return torch.embedding(weight, input, padding_idx, scale_grad_by_freq, sparse)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.

Without broadcast:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code> same stuff as above
/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:224: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead
24/06/29 05:16:23 ERROR Executor: Exception in task 38.0 in stage 5.0 (TID 101)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/pyspark/ml/functions.py", line 806, in predict
preds = predict_fn(single_input)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/app/process_rerank_embd_gpu.py", line 45, in predict
soln_embed = ranking_model.encode(inputs, normalize_embeddings=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
......
[bunch of propagation of errors, pytorch this calls that stuff]
......
File "/usr/local/lib/python3.11/site-packages/transformers/models/distilbert/modeling_distilbert.py", line 241, in forward
context = torch.matmul(weights, v) # (bs, n_heads, q_length, dim_per_head)
^^^^^^^^^^^^^^^^^^^^^^^^
torch.cuda.OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB. GPU 0 has a total capacity of 15.77 GiB of which 19.12 MiB is free. Process 41502 has 1.07 GiB memory in use. Process 2489411 has 614.00 MiB memory in use. Process 2489412 has 614.00 MiB memory in use. Process 2489410 has 784.00 MiB memory in use. Process 2489329 has 792.00 MiB memory in use. Process 2489401 has 796.00 MiB memory in use. Process 2489397 has 614.00 MiB memory in use. Process 2489199 has 796.00 MiB memory in use. Process 2489286 has 614.00 MiB memory in use. Process 2489203 has 634.00 MiB memory in use. Process 2489334 has 614.00 MiB memory in use. Process 2489362 has 614.00 MiB memory in use. Process 2489357 has 614.00 MiB memory in use. Process 2489264 has 614.00 MiB memory in use. Process 2489366 has 788.00 MiB memory in use. Process 2489240 has 772.00 MiB memory in use. Process 2489230 has 614.00 MiB memory in use. Process 2489226 has 614.00 MiB memory in use. Process 2489451 has 614.00 MiB memory in use. Process 2489306 has 614.00 MiB memory in use. Process 2489376 has 614.00 MiB memory in use. Process 2489315 has 614.00 MiB memory in use. Process 2489259 has 614.00 MiB memory in use. Process 2489399 has 432.00 MiB memory in use. Of the allocated memory 411.66 MiB is allocated by PyTorch, and 8.34 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation. See documentation for Memory Management (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)
</code>
<code> same stuff as above /usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:224: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead 24/06/29 05:16:23 ERROR Executor: Exception in task 38.0 in stage 5.0 (TID 101) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/pyspark/ml/functions.py", line 806, in predict preds = predict_fn(single_input) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/app/process_rerank_embd_gpu.py", line 45, in predict soln_embed = ranking_model.encode(inputs, normalize_embeddings=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...... [bunch of propagation of errors, pytorch this calls that stuff] ...... File "/usr/local/lib/python3.11/site-packages/transformers/models/distilbert/modeling_distilbert.py", line 241, in forward context = torch.matmul(weights, v) # (bs, n_heads, q_length, dim_per_head) ^^^^^^^^^^^^^^^^^^^^^^^^ torch.cuda.OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB. GPU 0 has a total capacity of 15.77 GiB of which 19.12 MiB is free. Process 41502 has 1.07 GiB memory in use. Process 2489411 has 614.00 MiB memory in use. Process 2489412 has 614.00 MiB memory in use. Process 2489410 has 784.00 MiB memory in use. Process 2489329 has 792.00 MiB memory in use. Process 2489401 has 796.00 MiB memory in use. Process 2489397 has 614.00 MiB memory in use. Process 2489199 has 796.00 MiB memory in use. Process 2489286 has 614.00 MiB memory in use. Process 2489203 has 634.00 MiB memory in use. Process 2489334 has 614.00 MiB memory in use. Process 2489362 has 614.00 MiB memory in use. Process 2489357 has 614.00 MiB memory in use. Process 2489264 has 614.00 MiB memory in use. Process 2489366 has 788.00 MiB memory in use. Process 2489240 has 772.00 MiB memory in use. Process 2489230 has 614.00 MiB memory in use. Process 2489226 has 614.00 MiB memory in use. Process 2489451 has 614.00 MiB memory in use. Process 2489306 has 614.00 MiB memory in use. Process 2489376 has 614.00 MiB memory in use. Process 2489315 has 614.00 MiB memory in use. Process 2489259 has 614.00 MiB memory in use. Process 2489399 has 432.00 MiB memory in use. Of the allocated memory 411.66 MiB is allocated by PyTorch, and 8.34 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation. See documentation for Memory Management (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables) </code>
  same stuff as above
/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:224: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead
24/06/29 05:16:23 ERROR Executor: Exception in task 38.0 in stage 5.0 (TID 101)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/pyspark/ml/functions.py", line 806, in predict
    preds = predict_fn(single_input)
            ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/app/process_rerank_embd_gpu.py", line 45, in predict
    soln_embed = ranking_model.encode(inputs, normalize_embeddings=True)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   ......
   [bunch of propagation of errors, pytorch this calls that stuff]
   ......
  File "/usr/local/lib/python3.11/site-packages/transformers/models/distilbert/modeling_distilbert.py", line 241, in forward
    context = torch.matmul(weights, v)  # (bs, n_heads, q_length, dim_per_head)
              ^^^^^^^^^^^^^^^^^^^^^^^^
torch.cuda.OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB. GPU 0 has a total capacity of 15.77 GiB of which 19.12 MiB is free. Process 41502 has 1.07 GiB memory in use. Process 2489411 has 614.00 MiB memory in use. Process 2489412 has 614.00 MiB memory in use. Process 2489410 has 784.00 MiB memory in use. Process 2489329 has 792.00 MiB memory in use. Process 2489401 has 796.00 MiB memory in use. Process 2489397 has 614.00 MiB memory in use. Process 2489199 has 796.00 MiB memory in use. Process 2489286 has 614.00 MiB memory in use. Process 2489203 has 634.00 MiB memory in use. Process 2489334 has 614.00 MiB memory in use. Process 2489362 has 614.00 MiB memory in use. Process 2489357 has 614.00 MiB memory in use. Process 2489264 has 614.00 MiB memory in use. Process 2489366 has 788.00 MiB memory in use. Process 2489240 has 772.00 MiB memory in use. Process 2489230 has 614.00 MiB memory in use. Process 2489226 has 614.00 MiB memory in use. Process 2489451 has 614.00 MiB memory in use. Process 2489306 has 614.00 MiB memory in use. Process 2489376 has 614.00 MiB memory in use. Process 2489315 has 614.00 MiB memory in use. Process 2489259 has 614.00 MiB memory in use. Process 2489399 has 432.00 MiB memory in use. Of the allocated memory 411.66 MiB is allocated by PyTorch, and 8.34 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

Both is one out of many same patterned error, indicating it’s multiple processes running at the same time, so cores and instances played a role, but reducing them doesn’t help. And interestingly, in my notebook, my spark settings is 2 instances 16 cores, it still works.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật