I want to create a table partitioned by date based on an integer timestamp value:
<code>spark.sql("""
CREATE TABLE my_catalog.test_iceberg.t1 (
timestamp_integer int,
customer_id string,
marketplace_id int,
order_qty int
)
PARTITIONED BY (date(timestamp_integer))
LOCATION 's3://MY_BBUCKET/emr/T1/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format-version'='2',
'format'='parquet',
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""")
</code>
<code>spark.sql("""
CREATE TABLE my_catalog.test_iceberg.t1 (
timestamp_integer int,
customer_id string,
marketplace_id int,
order_qty int
)
PARTITIONED BY (date(timestamp_integer))
LOCATION 's3://MY_BBUCKET/emr/T1/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format-version'='2',
'format'='parquet',
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""")
</code>
spark.sql("""
CREATE TABLE my_catalog.test_iceberg.t1 (
timestamp_integer int,
customer_id string,
marketplace_id int,
order_qty int
)
PARTITIONED BY (date(timestamp_integer))
LOCATION 's3://MY_BBUCKET/emr/T1/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format-version'='2',
'format'='parquet',
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""")
However, EMR/Spark throws the following exception:
<code>org.apache.iceberg.exceptions.ValidationException: Invalid source type int for transform: day
</code>
<code>org.apache.iceberg.exceptions.ValidationException: Invalid source type int for transform: day
</code>
org.apache.iceberg.exceptions.ValidationException: Invalid source type int for transform: day
The timestamp_integer
column stores the timestamp in milliseconds. Is there any trick or approach I can use to partition based on this integer value?
What I’ve done as a workaround (Based on a comment from Iceberg github’s issue) so far is using truncate(qualifier, 86400000)
(I am using truncate
transform to group the milliseconds into days (86,400,000 milliseconds per day)) to partition by day, but I’m not sure if this is the best or the proper way to do it in Iceberg. Any thoughts?
<code>spark.sql("""
CREATE TABLE my_catalog.test_iceberg.t1 (
timestamp_integer int,
customer_id string,
marketplace_id int,
order_qty int
)
PARTITIONED BY (truncate(qualifier, 86400000))
LOCATION 's3://MY_BBUCKET/emr/T1/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format-version'='2',
'format'='parquet',
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""")
</code>
<code>spark.sql("""
CREATE TABLE my_catalog.test_iceberg.t1 (
timestamp_integer int,
customer_id string,
marketplace_id int,
order_qty int
)
PARTITIONED BY (truncate(qualifier, 86400000))
LOCATION 's3://MY_BBUCKET/emr/T1/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format-version'='2',
'format'='parquet',
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""")
</code>
spark.sql("""
CREATE TABLE my_catalog.test_iceberg.t1 (
timestamp_integer int,
customer_id string,
marketplace_id int,
order_qty int
)
PARTITIONED BY (truncate(qualifier, 86400000))
LOCATION 's3://MY_BBUCKET/emr/T1/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format-version'='2',
'format'='parquet',
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""")