🐕PawData628 Rassen

Für Data Engineers

Von der CSV zu Erkenntnissen: ein praxisnahes Playbook

Der Datensatz liegt als CSV und SQLite mit normalisierter Quellen-Tabelle vor. So baust du daraus echte Pipelines mit Python, DuckDB, Airflow, dbt und Great Expectations — ohne es zu einer Enterprise-Ausgrabung zu machen.

1. Laden mit pandas oder Polars

Die Hauptdatei ist dog_breeds.csv (628 Zeilen, 44 Spalten). Eigenschaften liegen auf einer 1–5-Skala, Gewichte in Pfund, Größen in Zoll.

python🐕
import pandas as pd

df = pd.read_csv("data/dog_breeds.csv")

# Quick sanity pass
print(df.shape)               # (628, 44)
print(df["breed_group"].value_counts().head())
print(df["origin"].nunique(), "unique origins")

# Derived lifespan midpoint
df["lifespan_mid"] = (df["min_life_expectancy"] + df["max_life_expectancy"]) / 2
top = df.nlargest(10, "popularity_score")[["name", "breed_group", "origin"]]

Dasselbe mit Polars — schneller und typisiert:

python🐕
import polars as pl

df = pl.read_csv("data/dog_breeds.csv")
big_hounds = (
    df.filter((pl.col("breed_group") == "Hound") & (pl.col("size_category") == "large"))
      .select(["name", "origin", "max_weight_male"])
      .sort("max_weight_male", descending=True)
)

2. Dateiübergreifend abfragen mit DuckDB

DuckDB liest CSVs direkt, ohne Ladeschritt. Ideal, um Rassen mit Quellen und Bildern zu verknüpfen.

sql🐕
-- Count sources per breed
SELECT b.name, COUNT(s.source_url) AS source_count
FROM read_csv_auto('data/dog_breeds.csv') b
LEFT JOIN read_csv_auto('data/dog_breed_sources.csv') s
  ON s.breed_key = b.key
GROUP BY b.name
ORDER BY source_count DESC
LIMIT 10;

-- Find breeds missing primary images
SELECT b.key, b.name
FROM read_csv_auto('data/dog_breeds.csv') b
LEFT JOIN (
  SELECT breed_key FROM read_csv_auto('data/dog_breed_images_inventory.csv')
  WHERE is_primary = 1
) i ON i.breed_key = b.key
WHERE i.breed_key IS NULL;

3. Die mitgelieferte SQLite direkt nutzen

dog_breeds.sqlite enthält die Tabellen dogs, dog_sources und dog_images sowie die View reviewed_dogs und dataset_meta.

python🐕
import sqlite3, pandas as pd

con = sqlite3.connect("data/dog_breeds.sqlite")

# Widest lifespan ranges — good candidates for outlier review
q = """
SELECT name, min_life_expectancy, max_life_expectancy,
       (max_life_expectancy - min_life_expectancy) AS span
FROM dogs
WHERE min_life_expectancy IS NOT NULL
ORDER BY span DESC
LIMIT 20
"""
outliers = pd.read_sql_query(q, con)

4. Validieren mit Great Expectations

Bevor eine Pipeline in Produktion geht, fixiere deine Annahmen. Ein kleines Expectation-Suite wirkt Wunder:

python🐕
import great_expectations as gx

ctx = gx.get_context()
src = ctx.sources.add_pandas("dogs")
asset = src.add_csv_asset("dog_breeds", filepath_or_buffer="data/dog_breeds.csv")

batch = asset.build_batch_request()
validator = ctx.get_validator(batch_request=batch)

validator.expect_column_values_to_not_be_null("key")
validator.expect_column_values_to_be_unique("key")
validator.expect_column_values_to_be_between("energy", 1, 5)
validator.expect_column_values_to_be_between("trainability", 1, 5)
validator.expect_column_values_to_match_regex("source_url", r"^https?://")
validator.expect_column_values_to_be_in_set(
    "review_status", ["reviewed", "seeded"]
)
validator.save_expectation_suite("dog_breeds.basic")

5. Orchestrieren mit Airflow

Eine realistische tägliche Pipeline: Ingest → Validierung → Transformation → Publish. Hier mit der TaskFlow-API von Airflow 2.x:

python🐕
from datetime import datetime
from airflow.decorators import dag, task

@dag(
    dag_id="dog_breeds_daily",
    start_date=datetime(2026, 4, 17),
    schedule="@daily",
    catchup=False,
    tags=["dogs", "demo"],
)
def dog_breeds_pipeline():

    @task
    def ingest() -> str:
        import shutil
        src = "/opt/airflow/data/dog_breeds.csv"
        dst = "/opt/airflow/warehouse/raw/dog_breeds.csv"
        shutil.copyfile(src, dst)
        return dst

    @task
    def validate(path: str) -> str:
        import pandas as pd
        df = pd.read_csv(path)
        assert df["key"].is_unique, "breed key must be unique"
        assert df[["energy", "trainability"]].max().max() <= 5
        return path

    @task
    def transform(path: str) -> str:
        import duckdb
        duckdb.sql(f"""
            CREATE OR REPLACE TABLE mart_breed_profile AS
            SELECT key, name, breed_group, origin, size_category,
                   (min_life_expectancy + max_life_expectancy) / 2.0 AS lifespan_mid,
                   popularity_score
            FROM read_csv_auto('{path}')
        """)
        duckdb.sql("EXPORT DATABASE '/opt/airflow/warehouse/marts' (FORMAT PARQUET)")
        return "/opt/airflow/warehouse/marts"

    @task
    def publish(_marts: str):
        # push to S3, Snowflake, BigQuery, ...
        print("published ✅")

    publish(transform(validate(ingest())))

dog_breeds_pipeline()
Einen statischen Referenz-Datensatz täglich zu planen ist natürlich übertrieben — in echt würdest du nur auf neue Snapshots triggern (z. B. über einen S3KeySensor oder einen Dataset-URI-Trigger). Die DAG-Form bleibt identisch.

6. Modellieren mit dbt

Lege ein paar dbt-Modelle darüber, um Roh-, Staging- und Mart-Schicht sauber zu trennen:

sql🐕
-- models/staging/stg_breeds.sql
select
    key                                    as breed_key,
    name                                   as breed_name,
    nullif(breed_group, '')                as breed_group,
    nullif(origin, '')                     as origin,
    lower(size_category)                   as size_category,
    (min_life_expectancy + max_life_expectancy) / 2.0 as lifespan_mid_years,
    popularity_score
from {{ source('raw', 'dog_breeds') }};

-- models/marts/dim_breed.sql
select
    b.*,
    s.source_registry,
    s.source_url
from {{ ref('stg_breeds') }} b
left join {{ source('raw', 'dog_breed_sources') }} s
       on s.breed_key = b.breed_key and s.is_primary = 1;

7. SQL-Übungen mit Provenienz-Joins

Die mitgelieferte SQLite normalisiert Provenienz in dog_sources (n pro Rasse) und dog_images (n pro Rasse). Perfekte Spielwiese für 1:n-Joins, Aggregation und Window Functions. Probiere diese Übungen gegen dogs ⋈ dog_sources ⋈ dog_images.

1

Rassen mit nur einer Quelle

LEFT JOIN dogs auf dog_sources, GROUP BY Rasse, HAVING count(*) = 1. Genau die Zeilen, die eine zweite Prüfung brauchen.

sql🐕
-- Breeds backed by exactly one source — candidates for re-review
select b.key, b.name, count(s.source_id) as n_sources
from dogs b
left join dog_sources s on s.dog_id = b.id
group by b.key, b.name
having count(s.source_id) = 1
order by b.name;
2

Am häufigsten zitiertes Register

dog_sources nach source_registry gruppieren, count distinct dog_id pro Register, absteigend sortieren. Zeigt die Anker-Quellen des Datensatzes.

sql🐕
-- Most-cited source registry across the dataset
select s.source_registry,
       count(distinct s.dog_id) as n_breeds,
       count(*)                 as n_citations
from dog_sources s
group by s.source_registry
order by n_breeds desc
limit 10;
3

Prüfquote pro Rassegruppe

dogs nach breed_group aggregieren mit sum(review_status = 'reviewed')::float / count(*), per Window Function ranken. Findet Gruppen mit Kuratierungs-Lücken.

sql🐕
-- Reviewed coverage per breed group, ranked
select
    breed_group,
    count(*)                                         as n_total,
    sum(case when review_status = 'reviewed' then 1 else 0 end) as n_reviewed,
    round(100.0 * sum(case when review_status = 'reviewed' then 1 else 0 end)
          / count(*), 1)                             as pct_reviewed,
    rank() over (order by
        1.0 * sum(case when review_status = 'reviewed' then 1 else 0 end)
        / count(*) desc)                             as rnk
from dogs
where breed_group is not null
group by breed_group
order by rnk;
4

Bild-Verfügbarkeit pro Rasse

LEFT JOIN dog_images, COUNT(image_url) pro Rasse, dann per CASE in {0, 1, 2+} klassifizieren. Ideal, um ein UI mit echten Fotos zu seeden.

sql🐕
-- Image richness per breed, bucketed
with img_counts as (
    select b.id, b.name, count(i.image_url) as n_images
    from dogs b
    left join dog_images i on i.dog_id = b.id
    group by b.id, b.name
)
select
    case
        when n_images = 0 then '0 — no photo'
        when n_images = 1 then '1 — single photo'
        else '2+ — multiple photos'
    end                      as bucket,
    count(*)                 as n_breeds
from img_counts
group by bucket
order by bucket;
5

Herkunfts-Vielfalt je Gruppe

GROUP BY breed_group, COUNT(DISTINCT origin). Welche Gruppen sind regional konzentriert, welche weltweit vertreten?

sql🐕
-- Origin diversity per breed group
select breed_group,
       count(*)                     as n_breeds,
       count(distinct origin)       as n_origins,
       round(1.0 * count(distinct origin) / count(*), 2) as diversity_ratio
from dogs
where breed_group is not null and origin is not null
group by breed_group
order by diversity_ratio desc;

8. Analyseideen

  • Rassen-Clustering. k-means oder UMAP auf den 1–5-Eigenschaften laufen lassen, um „Archetypen“ jenseits der AKC-Gruppen zu finden.
  • Rassen-Empfehlungssystem. Eigenschaften anhand von Nutzerantworten gewichten (Wohnung? Kinder? Anfänger?) und eine Rangliste zurückgeben.
  • Herkunfts-Karte. origin geokodieren und Rassendichte pro Land plotten — mit Klima und cold_tolerance korrelieren.
  • Größe vs. Lebenserwartung. Den bekannten umgekehrten Zusammenhang zwischen Körpermasse und Lebenserwartung pro Gruppe bestätigen.
  • Quellen-Audit. Rassen pro source_registry zählen und Zeilen mit nur einer Quelle für eine erneute Prüfung markieren.
  • Wesens-NLP. temperament-Strings tokenisieren und einen Ko-Okkurrenz-Graph der Deskriptoren bauen.