When i want to add data from csv file as seperated ‘t’ and i used COPY and Insert into in my python script but i got some error like
Java.lang.RuntimeException: java.lang.AssertionError: Attempted serializing to buffer exceeded maximum of 65535 bytes: 1700053
Code:
def pd_to_cassandra_type(pd_series):
if pd.api.types.is_integer_dtype(pd_series):
return 'bigint'
elif pd.api.types.is_float_dtype(pd_series):
return 'double'
else:
return 'text'
def clean_column_name(name):
name = re.sub(r'W+', '_', name)
if name[0].isdigit():
name = '_' + name
return name
def create_table(session, keyspace_name, table_name, column_definition):
session.execute(f"""
CREATE KEYSPACE IF NOT EXISTS {keyspace_name}
WITH REPLICATION = {{ 'class': 'SimpleStrategy', 'replication_factor': 1 }}
""")
session.set_keyspace(keyspace_name)
session.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{column_definition},
id UUID PRIMARY KEY
)
""")
def split_and_load_data():
file_path = 'path/to/your/large_file.tab'
keyspace_name = 'my_keyspace'
table_name = "my_table"
data = pd.read_csv(file_path, delimiter='t')
# Clean and reassign column names
data.columns = [clean_column_name(col) for col in data.columns]
user = "user_name"
password = "1234"
auth_provider = PlainTextAuthProvider(username=user, password=password)
cluster = Cluster(["192.12.4.5"], auth_provider=auth_provider, connect_timeout=1000, control_connection_timeout=1000)
session = cluster.connect()
column_definitions = ", ".join([f"{col}" text for col in data.columns]
# İlk sütun grubuna göre tabloyu oluşturun
create_table(session, keyspace_name, table_name, column_definitions)
copy_command = f"cqlsh {"192.12.4.5"} -e "COPY {keyspace}.{table} ({', '.join(data.columns)}) FROM '{file_path}' WITH DELIMITER='t' AND HEADER=TRUE;""
subprocess.run(copy_command, shell=True, check=True)
CSV data size approximately 500 MB. Normally cassandra should support to load the data but i got some error. Are there any idea ?