M-Bus Gateway
← Tilbage til blog
· python· alembic· migration· postgresql· sqlmodel· multi-tenant· async· CI/CD· zero-downtime

Python Alembic migrationer — mønstre til multi-tenant SaaS

Alembic database migrations til Python SaaS: autogenerate, async engine, multi-tenant schema-strategi, zero-downtime column add, rollback-sikkerhed og CI/CD pipeline integration.

Af M-Bus Gateway

Alembic er den industristandard til database migrations med SQLAlchemy. Her er mønstrene der virker i multi-tenant SaaS med TimescaleDB.


Alembic opsætning med async engine

# server/alembic/env.py
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
from server.src.db.models import SQLModel
from server.src.config import get_settings

# Alembic config
config = context.config
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# Target metadata til autogenerate
target_metadata = SQLModel.metadata


def run_migrations_offline() -> None:
    """Kør migrations uden DB-forbindelse (SQL script output)."""
    settings = get_settings()
    context.configure(
        url=str(settings.database_url),
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
        compare_type=True,       # Detektér kolonne-type-ændringer
        compare_server_default=True,
    )
    with context.begin_transaction():
        context.run_migrations()


def do_run_migrations(connection) -> None:
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
        compare_server_default=True,
        # Ignorer TimescaleDB's interne tabeller
        include_schemas=False,
        include_object=_include_object,
    )
    with context.begin_transaction():
        context.run_migrations()


async def run_migrations_online() -> None:
    """Kør migrations med async engine."""
    settings = get_settings()
    connectable = create_async_engine(
        str(settings.database_url),
        poolclass=pool.NullPool,    # Én forbindelse pr. migration-kørsel
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()


def _include_object(obj, name, type_, reflected, compare_to):
    """Ekskludér TimescaleDB og PostGIS interne tabeller fra autogenerate."""
    excluded_schemas = {"timescaledb_internal", "timescaledb_experimental", "topology"}
    if hasattr(obj, "schema") and obj.schema in excluded_schemas:
        return False
    # Ekskludér TimescaleDB chunk-tabeller
    if type_ == "table" and name.startswith("_hyper_"):
        return False
    return True


if context.is_offline_mode():
    run_migrations_offline()
else:
    asyncio.run(run_migrations_online())

Autogenerate workflow

# Ny migration baseret på model-ændringer:
alembic revision --autogenerate -m "add_meter_mid_expiry_notification_sent"

# Gennemgå altid genereret migration inden apply:
cat alembic/versions/20260524_1234_add_meter_mid_expiry.py

# Apply migration:
alembic upgrade head

# Rollback én revision:
alembic downgrade -1

# Se historik:
alembic history --verbose

# Se nuværende revision:
alembic current

Zero-downtime column add

# alembic/versions/20260524_add_battery_level_to_readings.py
"""add battery_level_pct to readings

Revision ID: a1b2c3d4e5f6
Revises: 9f8e7d6c5b4a
Create Date: 2026-05-24 08:00:00
"""
from alembic import op
import sqlalchemy as sa


def upgrade() -> None:
    """
    Zero-downtime column add:
    1. ADD COLUMN med DEFAULT → PostgreSQL udfylder eksisterende rækker
    2. Ingen tabel-lock på TimescaleDB hypertable (PostgreSQL 11+)
    """
    op.add_column(
        "readings",
        sa.Column(
            "battery_level_pct",
            sa.SmallInteger(),
            nullable=True,          # NULLABLE — eksisterende rækker = NULL
            comment="Batteri % fra wM-Bus telegram (0-100)",
        ),
    )

    # Indeks til battery-trend dashboard (kun på hypertable-chunks)
    op.create_index(
        "ix_readings_battery_level",
        "readings",
        ["meter_installation_id", "timestamp", "battery_level_pct"],
        postgresql_where=sa.text("battery_level_pct IS NOT NULL"),
    )


def downgrade() -> None:
    op.drop_index("ix_readings_battery_level", table_name="readings")
    op.drop_column("readings", "battery_level_pct")

Multi-tenant migration-strategi

# alembic/versions/20260524_add_tenant_notification_prefs.py
"""add notification_prefs to tenants

Alle nye kolonner på tenant-tabellen er backward compatible.
Ingen breaking changes — applikationen håndterer NULL.
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB


def upgrade() -> None:
    # JSONB kolonne til fleksible notification-præferencer
    op.add_column(
        "tenants",
        sa.Column(
            "notification_prefs",
            JSONB(),
            nullable=True,
            server_default=sa.text("'{}'::jsonb"),    # Default: tom dict
        ),
    )

    # Data migration: Sæt defaults for alle eksisterende tenants
    op.execute("""
        UPDATE tenants
        SET notification_prefs = '{
            "settlement_deadline": true,
            "gateway_stale": true,
            "monthly_digest": true,
            "reading_coverage": true
        }'::jsonb
        WHERE notification_prefs IS NULL OR notification_prefs = '{}'::jsonb
    """)

    # Sæt NOT NULL efter data migration
    op.alter_column("tenants", "notification_prefs", nullable=False)


def downgrade() -> None:
    op.drop_column("tenants", "notification_prefs")

TimescaleDB hypertable migration

# alembic/versions/20260524_create_readings_hypertable.py
"""convert readings to timescaledb hypertable

Kald kun én gang — idempotent via if_not_exists.
"""
from alembic import op
import sqlalchemy as sa


def upgrade() -> None:
    # 1. Opret readings-tabel (normal PostgreSQL)
    op.create_table(
        "readings",
        sa.Column("id", sa.String(), primary_key=True),
        sa.Column("meter_installation_id", sa.String(), nullable=False),
        sa.Column("timestamp", sa.DateTime(timezone=True), nullable=False),
        sa.Column("value_kwh", sa.Numeric(precision=12, scale=3), nullable=False),
        sa.Column("rssi_dbm", sa.SmallInteger(), nullable=True),
        sa.Column("battery_level_pct", sa.SmallInteger(), nullable=True),
        sa.Column("received_at", sa.DateTime(timezone=True), nullable=False),
        sa.Column("source", sa.String(50), nullable=True, server_default="gateway"),
    )

    # 2. Konvertér til TimescaleDB hypertable (7-dages chunks)
    op.execute("""
        SELECT create_hypertable(
            'readings',
            'timestamp',
            chunk_time_interval => INTERVAL '7 days',
            if_not_exists => TRUE
        )
    """)

    # 3. Retention policy (10 år)
    op.execute("""
        SELECT add_retention_policy(
            'readings', INTERVAL '10 years', if_not_exists => TRUE
        )
    """)

    # 4. Komprimering (chunks ældre end 30 dage)
    op.execute("""
        ALTER TABLE readings SET (
            timescaledb.compress,
            timescaledb.compress_segmentby = 'meter_installation_id',
            timescaledb.compress_orderby = 'timestamp DESC'
        )
    """)
    op.execute("""
        SELECT add_compression_policy(
            'readings', INTERVAL '30 days', if_not_exists => TRUE
        )
    """)


def downgrade() -> None:
    # TimescaleDB hypertable kan ikke "un-konverteres" — drop og genopret
    op.drop_table("readings")

CI/CD pipeline integration

# .github/workflows/ci.yml (uddrag)
jobs:
  test-server:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: timescale/timescaledb:latest-pg16
        env:
          POSTGRES_PASSWORD: testpassword
          POSTGRES_DB: mbus_test
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    steps:
      - uses: actions/checkout@v4

      - name: Run Alembic migrations
        env:
          DATABASE_URL: postgresql+asyncpg://postgres:testpassword@localhost:5432/mbus_test
        run: |
          cd server
          alembic upgrade head
          # Verificér at migrations er idempotente
          alembic upgrade head

      - name: Run tests
        run: pytest server/tests/ -x -q --timeout=60
# server/tests/conftest.py — migration i test-setup
import pytest_asyncio
from alembic.config import Config
from alembic import command


@pytest_asyncio.fixture(scope="session", autouse=True)
async def apply_migrations(test_engine):
    """Kør Alembic migrations én gang per test-session."""
    alembic_cfg = Config("alembic.ini")
    alembic_cfg.set_main_option(
        "sqlalchemy.url",
        str(test_engine.url).replace("+asyncpg", "+psycopg2"),
    )
    command.upgrade(alembic_cfg, "head")

Konklusion

Alembic med async engine kræver asyncio.run() i env.py og NullPool under migrations. autogenerate med compare_type=True opdager kolonne-type-ændringer automatisk. Zero-downtime: tilføj NULLABLE kolonne, kør data-migration, sæt NOT NULL — aldrig ADD COLUMN NOT NULL på stor tabel. Multi-tenant JSONB-kolonner med server_default='{}::jsonb' er backward compatible. TimescaleDB-hypertable oprettes i migration via create_hypertable() SQL — downgrade() dropper og genopretter (hypertable kan ikke fjernes reversibelt). CI/CD kører alembic upgrade head som trin inden tests — og verificerer idempotens ved at køre det én gang til.

Se Python SQLAlchemy Core guide eller TimescaleDB IoT guide.