Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from alembic import context, op
from sqlalchemy.dialects.mysql import MEDIUMTEXT

from airflow.migrations.db_types import TIMESTAMP, StringID
from airflow.migrations.db_types import StringID

# revision identifiers, used by Alembic.
revision = "888b59e02a5b"
Expand All @@ -50,60 +50,11 @@ def upgrade():
op.execute("UPDATE connection SET is_extra_encrypted = FALSE WHERE is_extra_encrypted IS NULL")

op.execute("UPDATE dag SET is_paused = FALSE WHERE is_paused IS NULL")
op.execute("UPDATE dag SET has_import_errors = FALSE WHERE has_import_errors IS NULL")

op.execute(
"""
INSERT INTO log_template (filename, elasticsearch_id, created_at)
SELECT
'dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log',
'{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}',
CURRENT_TIMESTAMP
WHERE NOT EXISTS (SELECT 1 FROM log_template)
"""
)

op.execute("UPDATE dag_run SET state = 'queued' WHERE state IS NULL")
op.execute(
"""
UPDATE dag_run
SET log_template_id = (SELECT max(id) FROM log_template)
WHERE log_template_id IS NULL
"""
)
op.execute(
"""
UPDATE dag_run
SET updated_at = COALESCE(end_date, start_date, queued_at, logical_date, CURRENT_TIMESTAMP)
WHERE updated_at IS NULL
"""
)

op.execute("UPDATE log SET dttm = COALESCE(logical_date, CURRENT_TIMESTAMP) WHERE dttm IS NULL")

op.execute("UPDATE slot_pool SET slots = 0 WHERE slots IS NULL")
if dialect_name == "mysql":
op.execute(
"UPDATE slot_pool SET pool = CONCAT('__airflow_pool_fix_888b59e02a5b_', id) WHERE pool IS NULL"
)
else:
op.execute("UPDATE slot_pool SET pool = '__airflow_pool_fix_888b59e02a5b_' || id WHERE pool IS NULL")

op.execute("UPDATE task_instance SET try_number = 0 WHERE try_number IS NULL")
op.execute("UPDATE task_instance SET max_tries = -1 WHERE max_tries IS NULL")
op.execute("UPDATE task_instance SET hostname = '' WHERE hostname IS NULL")
op.execute("UPDATE task_instance SET unixname = '' WHERE unixname IS NULL")
op.execute("UPDATE task_instance SET queue = 'default' WHERE queue IS NULL")
op.execute("UPDATE task_instance SET priority_weight = 1 WHERE priority_weight IS NULL")
op.execute(
"UPDATE task_instance SET custom_operator_name = COALESCE(operator, '') WHERE custom_operator_name IS NULL"
)
if dialect_name == "postgresql":
op.execute(
"UPDATE task_instance SET executor_config = decode('80047d942e', 'hex') WHERE executor_config IS NULL"
)
else:
op.execute("UPDATE task_instance SET executor_config = x'80047d942e' WHERE executor_config IS NULL")

op.execute("UPDATE variable SET val = '' WHERE val IS NULL")
op.execute("UPDATE variable SET is_encrypted = FALSE WHERE is_encrypted IS NULL")
Expand All @@ -120,36 +71,15 @@ def upgrade():

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column("is_paused", existing_type=sa.BOOLEAN(), nullable=False)
batch_op.alter_column(
"has_import_errors",
existing_type=sa.BOOLEAN(),
nullable=False,
existing_server_default=sa.text("(false)"),
)

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.alter_column("state", existing_type=StringID(length=50), nullable=False)
batch_op.alter_column("log_template_id", existing_type=sa.INTEGER(), nullable=False)
batch_op.alter_column("updated_at", existing_type=TIMESTAMP(), nullable=False)

with op.batch_alter_table("log", schema=None) as batch_op:
batch_op.alter_column("dttm", existing_type=TIMESTAMP(), nullable=False)

with op.batch_alter_table("slot_pool", schema=None) as batch_op:
batch_op.alter_column("pool", existing_type=StringID(length=256), nullable=False)
batch_op.alter_column("slots", existing_type=sa.INTEGER(), nullable=False)

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column("try_number", existing_type=sa.INTEGER(), nullable=False)
batch_op.alter_column(
"max_tries", existing_type=sa.INTEGER(), nullable=False, existing_server_default=sa.text("'-1'")
)
batch_op.alter_column("hostname", existing_type=StringID(length=1000), nullable=False)
batch_op.alter_column("unixname", existing_type=StringID(length=1000), nullable=False)
batch_op.alter_column("queue", existing_type=StringID(length=256), nullable=False)
batch_op.alter_column("priority_weight", existing_type=sa.INTEGER(), nullable=False)
batch_op.alter_column("custom_operator_name", existing_type=StringID(length=1000), nullable=False)
batch_op.alter_column("executor_config", existing_type=sa.BLOB(), nullable=False)

with op.batch_alter_table("variable", schema=None) as batch_op:
batch_op.alter_column("key", existing_type=StringID(length=250), nullable=False)
Expand All @@ -174,36 +104,15 @@ def downgrade():
batch_op.alter_column("key", existing_type=StringID(length=250), nullable=True)

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column("executor_config", existing_type=sa.BLOB(), nullable=True)
batch_op.alter_column("custom_operator_name", existing_type=StringID(length=1000), nullable=True)
batch_op.alter_column("priority_weight", existing_type=sa.INTEGER(), nullable=True)
batch_op.alter_column("queue", existing_type=StringID(length=256), nullable=True)
batch_op.alter_column("unixname", existing_type=StringID(length=1000), nullable=True)
batch_op.alter_column("hostname", existing_type=StringID(length=1000), nullable=True)
batch_op.alter_column(
"max_tries", existing_type=sa.INTEGER(), nullable=True, existing_server_default=sa.text("'-1'")
)
batch_op.alter_column("try_number", existing_type=sa.INTEGER(), nullable=True)

with op.batch_alter_table("slot_pool", schema=None) as batch_op:
batch_op.alter_column("slots", existing_type=sa.INTEGER(), nullable=True)
batch_op.alter_column("pool", existing_type=StringID(length=256), nullable=True)

with op.batch_alter_table("log", schema=None) as batch_op:
batch_op.alter_column("dttm", existing_type=TIMESTAMP(), nullable=True)

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.alter_column("updated_at", existing_type=TIMESTAMP(), nullable=True)
batch_op.alter_column("log_template_id", existing_type=sa.INTEGER(), nullable=True)
batch_op.alter_column("state", existing_type=StringID(length=50), nullable=True)

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column(
"has_import_errors",
existing_type=sa.BOOLEAN(),
nullable=True,
existing_server_default=sa.text("(false)"),
)
batch_op.alter_column("is_paused", existing_type=sa.BOOLEAN(), nullable=True)

with op.batch_alter_table("connection", schema=None) as batch_op:
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class DagModel(Base):
max_consecutive_failed_dag_runs: Mapped[int] = mapped_column(Integer, nullable=False)

has_task_concurrency_limits: Mapped[bool] = mapped_column(Boolean, nullable=False)
has_import_errors: Mapped[bool] = mapped_column(Boolean(), default=False, server_default="0")
has_import_errors: Mapped[bool | None] = mapped_column(Boolean(), default=False, server_default="0")
fail_fast: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, server_default="0")
allowed_run_types: Mapped[list[str] | None] = mapped_column(sa.JSON(), nullable=True)

Expand Down
9 changes: 5 additions & 4 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class DagRun(Base, LoggingMixin):
logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
start_date: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
end_date: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
_state: Mapped[str] = mapped_column("state", String(50), default=DagRunState.QUEUED)
_state: Mapped[str | None] = mapped_column("state", String(50), default=DagRunState.QUEUED, nullable=True)
run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
creating_job_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
run_type: Mapped[str] = mapped_column(String(50), nullable=False)
Expand All @@ -186,17 +186,18 @@ class DagRun(Base, LoggingMixin):
# Foreign key to LogTemplate. DagRun rows created prior to this column's
# existence have this set to NULL. Later rows automatically populate this on
# insert to point to the latest LogTemplate entry.
log_template_id: Mapped[int] = mapped_column(
log_template_id: Mapped[int | None] = mapped_column(
Integer,
ForeignKey("log_template.id", name="task_instance_log_template_id_fkey", ondelete="NO ACTION"),
default=select(func.max(LogTemplate.__table__.c.id)),
nullable=True,
)
# This is nullable because it's too costly to migrate dagruns created prior
# to this column's addition (Airflow 3.2.0). If you want a reasonable
# meaningful non-null value, use ``dr.created_at or dr.run_after``.
created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=True, default=timezone.utcnow)
updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow
updated_at: Mapped[datetime | None] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=True
)
# Keeps track of the number of times the dagrun had been cleared.
# This number is incremented only when the DagRun is re-Queued,
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Log(Base):
__tablename__ = "log"

id: Mapped[int] = mapped_column(Integer, primary_key=True)
dttm: Mapped[datetime] = mapped_column(UtcDateTime)
dttm: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
dag_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
task_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
map_index: Mapped[int | None] = mapped_column(Integer, nullable=True)
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Pool(Base):
__tablename__ = "slot_pool"

id: Mapped[int] = mapped_column(Integer, primary_key=True)
pool: Mapped[str] = mapped_column(String(256), unique=True)
pool: Mapped[str | None] = mapped_column(String(256), unique=True, nullable=True)
# -1 for infinite
slots: Mapped[int] = mapped_column(Integer, default=0)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
Expand Down
12 changes: 6 additions & 6 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,22 +523,22 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
state: Mapped[str | None] = mapped_column(String(20), nullable=True)
try_number: Mapped[int] = mapped_column(Integer, default=0)
max_tries: Mapped[int] = mapped_column(Integer, server_default="-1")
hostname: Mapped[str] = mapped_column(String(1000))
unixname: Mapped[str] = mapped_column(String(1000))
hostname: Mapped[str | None] = mapped_column(String(1000), nullable=True)
unixname: Mapped[str | None] = mapped_column(String(1000), nullable=True)
pool: Mapped[str] = mapped_column(String(256), nullable=False)
pool_slots: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
queue: Mapped[str] = mapped_column(String(256))
priority_weight: Mapped[int] = mapped_column(Integer)
queue: Mapped[str | None] = mapped_column(String(256), nullable=True)
priority_weight: Mapped[int | None] = mapped_column(Integer, nullable=True)
operator: Mapped[str | None] = mapped_column(String(1000), nullable=True)
custom_operator_name: Mapped[str] = mapped_column(String(1000))
custom_operator_name: Mapped[str | None] = mapped_column(String(1000), nullable=True)
queued_dttm: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
scheduled_dttm: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
queued_by_job_id: Mapped[int | None] = mapped_column(Integer, nullable=True)

last_heartbeat_at: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
pid: Mapped[int | None] = mapped_column(Integer, nullable=True)
executor: Mapped[str | None] = mapped_column(String(1000), nullable=True)
executor_config: Mapped[dict] = mapped_column(ExecutorConfigType(pickler=dill))
executor_config: Mapped[dict | None] = mapped_column(ExecutorConfigType(pickler=dill), nullable=True)
updated_at: Mapped[datetime | None] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=True
)
Expand Down
Loading