I’m trying to parse XML strings written in multiple rows of a table field
What i have:
1. Spark Dataframe
The dataframe is like this:
With this schema:
root
|-- Id: decimal(18,0) (nullable = true)
|-- OrderId: string (nullable = true)
|-- Action: string (nullable = true)
|-- TrayportDateTimeUTC: timestamp (nullable = true)
|-- GV8Data: string (nullable = true)
2. XML Sample
This is the sample of one XML string:
<GV8APIDATA
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="gv8api-trayport-com">
<ORDER EngineID="1" PersistentOrderID="60076044" OrderID="52389917" OldEngineID="0" OldOrderID="0" Action="Remove" DateTime="2023-10-14T00:16:33.0467795Z" DateTimeNanoSecondsPart="0" Price="169.50" Volume="1" HiddenVolume="0" PriceDelta="0.00" Side="Ask" Status="Withheld" Company="Company" CompanyID="278" Broker="ICAP" BrokerID="4" User="User" UserID="1910" Trader="User" TraderID="1910" OrderType="GoodTillCancelled" AllOrNone="false" CounterPartyOk="false" SystemRank="55936541" ImpliedType="None" IsTradable="No" OrderDealt="false" TradingCapacity="DEAL" ExecutionMaker="19850428" DerivativeIndicator="false" DEA="false" LiquidityProvision="true" ProductClassification="RM - MIFID" IsMarketData="true" IsOwnData="true">
<INSTSPECIFIER InstID="10641712" InstName="Germany Peaks EEX" FirstSequenceID="10000104" SeqSpan="Single" FirstSequenceItemID="240" SecondSequenceItemID="0" FirstSequenceItemName="Dec-23" SecondSequenceItemName="" TermFormatID="2906593310" ExternalInstID="10641712"/>
</ORDER>
</GV8APIDATA>
From my checks the XML is well formatted
My Tentative
1. Spark Sql Fast Test
Using from_xml e schema_of_xml functions
%sql
SELECT from_xml('<GV8APIDATA
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="gv8api-trayport-com">
<ORDER EngineID="1" PersistentOrderID="60076044" OrderID="52389917" OldEngineID="0" OldOrderID="0" Action="Remove" DateTime="2023-10-14T00:16:33.0467795Z" DateTimeNanoSecondsPart="0" Price="169.50" Volume="1" HiddenVolume="0" PriceDelta="0.00" Side="Ask" Status="Withheld" Company="Company" CompanyID="278" Broker="ICAP" BrokerID="4" User="User" UserID="1910" Trader="User" TraderID="1910" OrderType="GoodTillCancelled" AllOrNone="false" CounterPartyOk="false" SystemRank="55936541" ImpliedType="None" IsTradable="No" OrderDealt="false" TradingCapacity="DEAL" ExecutionMaker="19850428" DerivativeIndicator="false" DEA="false" LiquidityProvision="true" ProductClassification="RM - MIFID" IsMarketData="true" IsOwnData="true">
<INSTSPECIFIER InstID="10641712" InstName="Germany Peaks EEX" FirstSequenceID="10000104" SeqSpan="Single" FirstSequenceItemID="240" SecondSequenceItemID="0" FirstSequenceItemName="Dec-23" SecondSequenceItemName="" TermFormatID="2906593310" ExternalInstID="10641712"/>
</ORDER>
</GV8APIDATA>', schema_of_xml('<GV8APIDATA
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="gv8api-trayport-com">
<ORDER EngineID="1" PersistentOrderID="60076044" OrderID="52389917" OldEngineID="0" OldOrderID="0" Action="Remove" DateTime="2023-10-14T00:16:33.0467795Z" DateTimeNanoSecondsPart="0" Price="169.50" Volume="1" HiddenVolume="0" PriceDelta="0.00" Side="Ask" Status="Withheld" Company="Company" CompanyID="278" Broker="ICAP" BrokerID="4" User="User" UserID="1910" Trader="User" TraderID="1910" OrderType="GoodTillCancelled" AllOrNone="false" CounterPartyOk="false" SystemRank="55936541" ImpliedType="None" IsTradable="No" OrderDealt="false" TradingCapacity="DEAL" ExecutionMaker="19850428" DerivativeIndicator="false" DEA="false" LiquidityProvision="true" ProductClassification="RM - MIFID" IsMarketData="true" IsOwnData="true">
<INSTSPECIFIER InstID="10641712" InstName="Germany Peaks EEX" FirstSequenceID="10000104" SeqSpan="Single" FirstSequenceItemID="240" SecondSequenceItemID="0" FirstSequenceItemName="Dec-23" SecondSequenceItemName="" TermFormatID="2906593310" ExternalInstID="10641712"/>
</ORDER>
</GV8APIDATA>')) test
and I obtain more or less the expected result
2. Spark Sql Estensive Test
Using from_xml e schema_of_xml functions
SELECT from_xml(GV8Data, schema_of_xml('STRUCT<ORDER: STRUCT<INSTSPECIFIER: STRUCT<_ExternalInstID: BIGINT, _FirstSequenceID: BIGINT, _FirstSequenceItemID: BIGINT, _FirstSequenceItemName: STRING, _InstID: BIGINT, _InstName: STRING, _SecondSequenceItemID: BIGINT, _SecondSequenceItemName: STRING, _SeqSpan: STRING, _TermFormatID: BIGINT>, _Action: STRING, _AllOrNone: BOOLEAN, _Broker: STRING, _BrokerID: BIGINT, _Company: STRING, _CompanyID: BIGINT, _CounterPartyOk: BOOLEAN, _DEA: BOOLEAN, _DateTime: TIMESTAMP, _DateTimeNanoSecondsPart: BIGINT, _DerivativeIndicator: BOOLEAN, _EngineID: BIGINT, _ExecutionMaker: BIGINT, _HiddenVolume: BIGINT, _ImpliedType: STRING, _IsMarketData: BOOLEAN, _IsOwnData: BOOLEAN, _IsTradable: STRING, _LiquidityProvision: BOOLEAN, _OldEngineID: BIGINT, _OldOrderID: BIGINT, _OrderDealt: BOOLEAN, _OrderID: BIGINT, _OrderType: STRING, _PersistentOrderID: BIGINT, _Price: DOUBLE, _PriceDelta: DOUBLE, _ProductClassification: STRING, _Side: STRING, _Status: STRING, _SystemRank: BIGINT, _Trader: STRING, _TraderID: BIGINT, _TradingCapacity: STRING, _User: STRING, _UserID: BIGINT, _Volume: BIGINT>, _xmlns: STRING, `_xmlns:xsd`: STRING, `_xmlns:xsi`: STRING>')) test
FROM PARQUET.`path_to_file/file.parquet`
things begin to get wrong
3. Pyspark Test
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, TimestampType
# INSTSPECIFIER
inst_specifier_schema = StructType([
StructField("InstID", StringType(), True),
StructField("InstName", StringType(), True),
StructField("FirstSequenceID", StringType(), True),
StructField("SeqSpan", StringType(), True),
StructField("FirstSequenceItemID", StringType(), True),
StructField("SecondSequenceItemID", StringType(), True),
StructField("FirstSequenceItemName", StringType(), True),
StructField("SecondSequenceItemName", StringType(), True),
StructField("TermFormatID", StringType(), True),
StructField("ExternalInstID", StringType(), True)
])
# ORDER
order_schema = StructType([
StructField("EngineID", StringType(), True),
StructField("PersistentOrderID", StringType(), True),
StructField("OrderID", StringType(), True),
StructField("OldEngineID", StringType(), True),
StructField("OldOrderID", StringType(), True),
StructField("Action", StringType(), True),
StructField("DateTime", StringType(), True),
StructField("DateTimeNanoSecondsPart", StringType(), True),
StructField("Price", FloatType(), True),
StructField("Volume", StringType(), True),
StructField("HiddenVolume", StringType(), True),
StructField("PriceDelta", FloatType(), True),
StructField("Side", StringType(), True),
StructField("Status", StringType(), True),
StructField("Company", StringType(), True),
StructField("CompanyID", StringType(), True),
StructField("Broker", StringType(), True),
StructField("BrokerID", StringType(), True),
StructField("User", StringType(), True),
StructField("UserID", StringType(), True),
StructField("Trader", StringType(), True),
StructField("TraderID", StringType(), True),
StructField("OrderType", StringType(), True),
StructField("AllOrNone", BooleanType(), True),
StructField("CounterPartyOk", BooleanType(), True),
StructField("SystemRank", StringType(), True),
StructField("ImpliedType", StringType(), True),
StructField("IsTradable", StringType(), True),
StructField("OrderDealt", BooleanType(), True),
StructField("TradingCapacity", StringType(), True),
StructField("ExecutionMaker", StringType(), True),
StructField("DerivativeIndicator", BooleanType(), True),
StructField("DEA", BooleanType(), True),
StructField("LiquidityProvision", BooleanType(), True),
StructField("ProductClassification", StringType(), True),
StructField("IsMarketData", BooleanType(), True),
StructField("IsOwnData", BooleanType(), True),
StructField("INSTSPECIFIER", inst_specifier_schema, True)
])
# root
gv8api_data_schema = StructType([
StructField("ORDER", order_schema, True)
])
df = spark.read.parquet("path_to_file/file.parquet")
parsed_df = df.withColumn("parsed_xml", from_xml("GV8Data",schema=order_schema))
My results
I receive this obscure error
Error in callback <bound method UserNamespaceCommandHook.post_run_cell of <dbruntime.DatasetInfo.UserNamespaceCommandHook object at 0x7fe060e563e0>> (for post_run_cell):
With no other details.
What am I missing?