I wrote python script which connects treasure data workflow and database so able to get the data from treasure data but while uploading data to treasure data facing so many error every time i solve error new error coming, please anyone have any idea please suggest me how to upload dataframe data to TD please help me!
Here is the sample code i tried, and i tried no.of ways but didn’t workout
# Set API key and instantiate the client
apikey = '768/6f8d328973hne4d6a3c920okjnl'
client = tdclient.Client(apikey)
job = client.query('ge_src_dev',
"SELECT cast(lower(api_data) as json) "
# Wait for the job to complete
[job.status(), job.finished()]
resultss = [r for r in job.result()]
# Create DataFrame from results
df = pd.DataFrame(resultss, columns=['api_data'])
json_data_column = 'api_data'
# Function to parse JSON strings
def parse_json(json_str):
return json.loads(json_str)
df[json_data_column] = df[json_data_column].apply(parse_json)
for index, row in df.iterrows():
json_data = row[json_data_column]
if json_data is not None:
parsed_data = json.loads(json_data)
# Process the JSON data here
dat = (parsed_data['results'])
# Create a new DataFrame from the unnested data
flattened_df = pd.DataFrame(dat)
# Ensure the columns are correctly ordered
flattened_df = flattened_df[['created', 'uuid', 'lastlogin', 'email']]
# Set up Treasure Data client
client = pytd.Client(apikey)
table = 'jann_transformed'
destination = f"{database}.{table}"
table_schema = client.get_table(database, table)
table_schema = table.show()
# Print column names and data types
for column in table_schema["schema"]:
print(f"Column: {column['name']}, Data Type: {column['type']}")
# Rename columns to match the Treasure Data schema
#flattened_df = flattened_df.rename(columns={'lastlogin': 'lastLogin'})
# Debug: Verify renamed columns
print(flattened_df.columns)
def _cast_dtypes(dataframe, keep_list=None):
for column, dtype in dataframe.dtypes.items():
if kind == 'O': # Object type, likely string
dataframe[column] = dataframe[column].astype('str')
print(f"Error converting column '{column}' to str: {e}")
elif kind == 'M': # Datetime type
dataframe[column] = dataframe[column].astype('str')
print(f"Error converting column '{column}' to str: {e}")
_cast_dtypes(flattened_df)
print(flattened_df.dtypes)
# Convert DataFrame to dictionary of data types
# dtype_dict = {column: str(flattened_df[column].dtype) for column in flattened_df.columns}
# Upload the DataFrame to Treasure Data
client.load_table_from_dataframe(
# Pass dtype dictionary to specify data types
print("Data uploaded to Treasure Data successfully.")
<code>import tdclient
import pandas as pd
import json
import pytd
import numpy as np
# Set API key and instantiate the client
apikey = '768/6f8d328973hne4d6a3c920okjnl'
client = tdclient.Client(apikey)
job = client.query('ge_src_dev',
"SELECT cast(lower(api_data) as json) "
"FROM janra_py",
type='presto')
# Wait for the job to complete
[job.status(), job.finished()]
# Fetch the results
resultss = [r for r in job.result()]
print(type(resultss))
# Create DataFrame from results
df = pd.DataFrame(resultss, columns=['api_data'])
json_data_column = 'api_data'
# Function to parse JSON strings
def parse_json(json_str):
try:
return json.loads(json_str)
except ValueError:
return None
df[json_data_column] = df[json_data_column].apply(parse_json)
for index, row in df.iterrows():
json_data = row[json_data_column]
if json_data is not None:
parsed_data = json.loads(json_data)
# Process the JSON data here
dat = (parsed_data['results'])
# Create a new DataFrame from the unnested data
flattened_df = pd.DataFrame(dat)
# Ensure the columns are correctly ordered
flattened_df = flattened_df[['created', 'uuid', 'lastlogin', 'email']]
# Set up Treasure Data client
client = pytd.Client(apikey)
database = 'ge_src_dev'
table = 'jann_transformed'
destination = f"{database}.{table}"
table_schema = client.get_table(database, table)
table_schema = table.show()
# Print column names and data types
for column in table_schema["schema"]:
print(f"Column: {column['name']}, Data Type: {column['type']}")
treasure_data_schema = {
'created': 'string',
'email': 'string',
'lastlogin': 'string',
'uuid': 'string'
}
# Rename columns to match the Treasure Data schema
#flattened_df = flattened_df.rename(columns={'lastlogin': 'lastLogin'})
# Debug: Verify renamed columns
print(flattened_df.columns)
def _cast_dtypes(dataframe, keep_list=None):
if keep_list is None:
keep_list = []
for column, dtype in dataframe.dtypes.items():
kind = dtype.kind
if column in keep_list:
continue
if kind == 'O': # Object type, likely string
try:
dataframe[column] = dataframe[column].astype('str')
except Exception as e:
print(f"Error converting column '{column}' to str: {e}")
elif kind == 'M': # Datetime type
try:
dataframe[column] = dataframe[column].astype('str')
except Exception as e:
print(f"Error converting column '{column}' to str: {e}")
_cast_dtypes(flattened_df)
print(flattened_df.dtypes)
# Convert DataFrame to dictionary of data types
# dtype_dict = {column: str(flattened_df[column].dtype) for column in flattened_df.columns}
# print(dtype_dict)
# Upload the DataFrame to Treasure Data
client.load_table_from_dataframe(
dataframe=flattened_df,
destination=destination,
writer='bulk_import',
if_exists='overwrite'
# Pass dtype dictionary to specify data types
)
print("Data uploaded to Treasure Data successfully.")
</code>
import tdclient
import pandas as pd
import json
import pytd
import numpy as np
# Set API key and instantiate the client
apikey = '768/6f8d328973hne4d6a3c920okjnl'
client = tdclient.Client(apikey)
job = client.query('ge_src_dev',
"SELECT cast(lower(api_data) as json) "
"FROM janra_py",
type='presto')
# Wait for the job to complete
[job.status(), job.finished()]
# Fetch the results
resultss = [r for r in job.result()]
print(type(resultss))
# Create DataFrame from results
df = pd.DataFrame(resultss, columns=['api_data'])
json_data_column = 'api_data'
# Function to parse JSON strings
def parse_json(json_str):
try:
return json.loads(json_str)
except ValueError:
return None
df[json_data_column] = df[json_data_column].apply(parse_json)
for index, row in df.iterrows():
json_data = row[json_data_column]
if json_data is not None:
parsed_data = json.loads(json_data)
# Process the JSON data here
dat = (parsed_data['results'])
# Create a new DataFrame from the unnested data
flattened_df = pd.DataFrame(dat)
# Ensure the columns are correctly ordered
flattened_df = flattened_df[['created', 'uuid', 'lastlogin', 'email']]
# Set up Treasure Data client
client = pytd.Client(apikey)
database = 'ge_src_dev'
table = 'jann_transformed'
destination = f"{database}.{table}"
table_schema = client.get_table(database, table)
table_schema = table.show()
# Print column names and data types
for column in table_schema["schema"]:
print(f"Column: {column['name']}, Data Type: {column['type']}")
treasure_data_schema = {
'created': 'string',
'email': 'string',
'lastlogin': 'string',
'uuid': 'string'
}
# Rename columns to match the Treasure Data schema
#flattened_df = flattened_df.rename(columns={'lastlogin': 'lastLogin'})
# Debug: Verify renamed columns
print(flattened_df.columns)
def _cast_dtypes(dataframe, keep_list=None):
if keep_list is None:
keep_list = []
for column, dtype in dataframe.dtypes.items():
kind = dtype.kind
if column in keep_list:
continue
if kind == 'O': # Object type, likely string
try:
dataframe[column] = dataframe[column].astype('str')
except Exception as e:
print(f"Error converting column '{column}' to str: {e}")
elif kind == 'M': # Datetime type
try:
dataframe[column] = dataframe[column].astype('str')
except Exception as e:
print(f"Error converting column '{column}' to str: {e}")
_cast_dtypes(flattened_df)
print(flattened_df.dtypes)
# Convert DataFrame to dictionary of data types
# dtype_dict = {column: str(flattened_df[column].dtype) for column in flattened_df.columns}
# print(dtype_dict)
# Upload the DataFrame to Treasure Data
client.load_table_from_dataframe(
dataframe=flattened_df,
destination=destination,
writer='bulk_import',
if_exists='overwrite'
# Pass dtype dictionary to specify data types
)
print("Data uploaded to Treasure Data successfully.")
even i tried in workflow in treasure with some chages again facing some issue
Here is the one sample error but while trying to resolve new errors coming
mod = import(“.”.join(fragments), fromlist=[method_name])
<code>from File "/home/td-user/python_script/janrain_script.py", line 117, in <module>
from File "/home/td-user/python_script/janrain_script.py", line 69, in task
client.load_table_from_dataframe(database, table, flattened_df, if_exists='overwrite')
from File "/usr/local/lib/python3.9/site-packages/pytd/client.py", line 339, in load_table_from_dataframe
destination.import_dataframe(dataframe, writer, if_exists, **kwargs)
from File "/usr/local/lib/python3.9/site-packages/pytd/table.py", line 115, in import_dataframe
dataframe = dataframe.rename(
from AttributeError: 'str' object has no attribute 'rename'
<code>from File "/home/td-user/python_script/janrain_script.py", line 117, in <module>
task()
from File "/home/td-user/python_script/janrain_script.py", line 69, in task
client.load_table_from_dataframe(database, table, flattened_df, if_exists='overwrite')
from File "/usr/local/lib/python3.9/site-packages/pytd/client.py", line 339, in load_table_from_dataframe
destination.import_dataframe(dataframe, writer, if_exists, **kwargs)
from File "/usr/local/lib/python3.9/site-packages/pytd/table.py", line 115, in import_dataframe
dataframe = dataframe.rename(
from AttributeError: 'str' object has no attribute 'rename'
</code>
from File "/home/td-user/python_script/janrain_script.py", line 117, in <module>
task()
from File "/home/td-user/python_script/janrain_script.py", line 69, in task
client.load_table_from_dataframe(database, table, flattened_df, if_exists='overwrite')
from File "/usr/local/lib/python3.9/site-packages/pytd/client.py", line 339, in load_table_from_dataframe
destination.import_dataframe(dataframe, writer, if_exists, **kwargs)
from File "/usr/local/lib/python3.9/site-packages/pytd/table.py", line 115, in import_dataframe
dataframe = dataframe.rename(
from AttributeError: 'str' object has no attribute 'rename'
this is the first time i’m trying to upload data to treasure data please suggest your ideas and thoughts, Thanks..