I have to write same data to two separate data store. I am using PySpark foreachPartition to process data parallel, and using threads to write the data to two store in parallel.
df.foreachPartition(
self.manager.partition_funtion(arg1,arg2,..)
)
in Manager class
def partition_funtion(self,arg1, arg2, ...):
def process_rows(rows):
executor = concurrent.futures.ProcessPoolExecutor()
storage1 = executor.submit(upload_data, client1, rows)
storage2 = executor.submit(upload_data, client2, rows)
storage1.result()
storage1.result()
return process_rows
def upload_data(client, data):
client.upload(data)
Above approach with concurrent module results in error
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 806, in foreachPartition
self.mapPartitions(func).count() # Force evaluation
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
vals = self.mapPartitions(func).collect()
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 13, 10.0.36.127, executor 7): org.apache.spark.api.python.PythonException: concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib64/python3.7/multiprocessing/queues.py", line 236, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib64/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
If I use Thread library it runs infinitely, I also used Process lib as well.
Do should be the right way to implement this?
Do should be the right way to implement this?