I am using sqlalchemy with a postgresql database hosted on AWS. I have one particular table that is extremely slow. Other tables load in seconds. This particular table can take 10 hours to load ~400 records. Most of the other tables I use session.bulk_insert_mappings. It seems like I could not use it on this table because of the Identity column but I could be wrong. The following is a generic version of what I have and how I am doing the insert. The table that has the problem is Output. I also have a local database that does not seem to have the problem. The database on AWS has to go through a proxy to get there. Any ideas on how to speed it up?
class MyKey(Base):
"""MyKey table declaration."""
__tablename__ = 'my_key'
__table_args__ = {'schema': SCHEMA}
id = Column(BigInteger, Identity(always=True, start=0, increment=1, minvalue=0, cache=1), primary_key=True)
year = Column(SmallInteger, nullable=False)
key1 = Column(Text, nullable=False)
key2 = Column(Text, nullable=False)
key3 = Column(Text, nullable=False)
key4 = Column(Text, nullable=False)
key5 = Column(Text, nullable=False)
level = Column(Text, nullable=False)
delete_key = Column(Boolean, nullable=False)
last_modified_by = Column(Text, nullable=False)
last_modified_time = Column(DateTime(timezone=True), nullable=False)
children = relationship('Output', back_populates='parent')
metric_children = relationship('MetricOutput', back_populates='key_parent')
metric_detail_children = relationship('MetricDetailOut', back_populates='key_parent')
def __repr__(self):
return (f"<MyKey(id={self.id}, "
f"year={self.year}, key1={self.key1}, key2='{self.key2}', key3='{self.key3}', "
f"key4='{self.key4}', key5='{self.key5}', level='{self.level}', "
f"delete_key={self.delete_key}, "
f"last_modified_by='{self.last_modified_by}', last_modified_time={self.last_modified_time})>")
class Output(Base):
"""Output table declaration."""
__tablename__ = 'output'
__table_args__ = {'schema': SCHEMA}
id = Column(BigInteger, ForeignKey(f"{SCHEMA}.my_key.id"), primary_key=False)
id2 = Column(BigInteger, Identity(always=True, start=0, increment=1, minvalue=0, cache=1), primary_key=True)
value1 = Column(Text, nullable=False)
value2 = Column(Text, nullable=False)
value3 = Column(Text, nullable=False)
value3 = Column(Float, nullable=False)
value4 = Column(Float, nullable=False)
value5 = Column(Text, nullable=False)
value6 = Column(Text, nullable=False)
value7 = Column(SmallInteger, nullable=False)
value8 = Column(Text, nullable=False)
value9 = Column(Numeric(10, 2), nullable=False)
value10 = Column(Numeric(10, 2), nullable=False)
value11 = Column(Numeric(10, 2), nullable=False)
value12 = Column(Numeric(7, 2), nullable=False)
value13 = Column(Numeric(13, 4), nullable=False)
value14 = Column(Numeric(13, 4), nullable=False)
value15 = Column(Numeric(13, 4), nullable=False)
value16 = Column(Numeric(13, 4), nullable=False)
value17 = Column(Numeric(13, 4), nullable=False)
value18 = Column(Integer, nullable=False)
delete = Column(Boolean, nullable=False)
last_modified_by = Column(Text, nullable=False)
last_modified_time = Column(DateTime(timezone=True), nullable=False)
parent = relationship('MyKey', back_populates='children')
children = relationship('MetricOutput', back_populates='parent')
detail_children = relationship('MetricDetailOut', back_populates='parent')
def __repr__(self):
return (f"<Output(id={self.id}, id2={self.id2}, value1='{self.value1}', "
f"value2='{self.value2}', value3='{self.value3}', "
f"value3={self.value3}, value4={self.value4}, value5='{self.value5}', value6='{self.value6}', "
f"value7={self.value7}, value8='{self.value8}', "
f"value9={self.value9}, value10={self.value10}, "
f"value11={self.value11}, value12={self.value12}, "
f"value13={self.value13}, value14={self.value14}, "
f"value15={self.value15}, value16={self.value16}, "
f"value17={self.value17}, value18={self.value18}, "
f"delete={self.delete}, "
f"last_modified_by='{self.last_modified_by}', last_modified_time={self.last_modified_time}>")
class MetricOutput(Base):
"""MetricOutput table declaration."""
__tablename__ = 'metric_output'
__table_args__ = {'schema': SCHEMA}
id = Column(BigInteger, ForeignKey(f"{SCHEMA}.my_key.id"), primary_key=True)
id2 = Column(BigInteger, ForeignKey(f"{SCHEMA}.output.id2"), primary_key=True)
metric = Column(Text, primary_key=True)
value = Column(ARRAY(Numeric(15, 4)), nullable=False)
delete = Column(Boolean, nullable=False)
last_modified_by = Column(Text, nullable=False)
last_modified_time = Column(DateTime(timezone=True), nullable=False)
key_parent = relationship('MyKey', back_populates='metric_children')
parent = relationship('Output', back_populates='children')
def __repr__(self):
return (f"<MetricOutput(id={self.id}, id2={self.id2}, metric={self.metric}, value={self.value}, "
f"delete={self.delete_key}, "
f"last_modified_by='{self.last_modified_by}', last_modified_time={self.last_modified_time}>")
class MetricDetailOut(Base):
"""MetricDetailOut table declaration."""
__tablename__ = 'metric_detail_out'
__table_args__ = {'schema': SCHEMA}
id = Column(BigInteger, ForeignKey(f"{SCHEMA}.my_key.id"), primary_key=True)
id2 = Column(BigInteger, ForeignKey(f"{SCHEMA}.output.id2"), primary_key=True)
detail = Column(BigInteger, primary_key=True)
metric = Column(Text, primary_key=True)
value = Column(ARRAY(Numeric(15, 4)), nullable=False)
delete = Column(Boolean, nullable=False)
last_modified_by = Column(Text, nullable=False)
last_modified_time = Column(DateTime(timezone=True), nullable=False)
key_parent = relationship('MyKey', back_populates='metric_detail_children')
parent = relationship('Output', back_populates='detail_children')
def __repr__(self):
return (f"<MetricDetailOut(id={self.id}, id2={self.id2}, detail={self.detail}, "
f"metric={self.metric}, value={self.value}, delete={self.delete}, "
f"last_modified_by='{self.last_modified_by}', last_modified_time={self.last_modified_time}>")
from sqlalchemy import create_negine
db_engine = create_engine(database_uri,
pool_size=10,
max_overflow=2,
pool_recycle=300,
pool_use_lifo=True,
pool_pre_ping=True,
connect_args={'keepalives': 1,
'keepalives_idle': 30,
'keepalives_interval': 10,
'keepalives_count': 5},
echo=False)
db_session_maker = sessionmaker(bind=db_engine)
db_session = self.db_session_maker()
table = Output
new_table_list = []
col_list = []
dataframe_columns = dataframe.columns
for col in table.__table__.columns:
if not isinstance(col.server_default, Identity):
col_list.append(col.key)
for dfi in dataframe.index:
new_table = table()
for col in col_list:
setattr(new_table, col, dataframe.at[dfi, col])
new_table_list.append(new_table)
# db_session.add(new_table)
db_session.add_all(new_table_list)