Data Pipeline ve ETL Nedir? Veri Mühendisliği Rehberi
Farklı kaynaklardan gelen verileri birleştirmek mi istiyorsunuz? Raporlama sisteminiz yavaş mı? Data Pipeline ile verileri otomatik olarak toplayın, dönüştürün ve analiz için hazırlayın.
ETL Tanımı
ETL (Extract, Transform, Load), verinin kaynaklardan alınması, dönüştürülmesi ve hedef sisteme yüklenmesi sürecidir.
Extract (Çıkar) Transform (Dönüştür) Load (Yükle)
┌─ PostgreSQL ─┐ ┌──────────┐ ┌─ Data Warehouse ─┐
├─ API ├──→ │ Temizle │ ──→ │ BigQuery │
├─ CSV │ │ Birleştir│ │ Redshift │
└─ MongoDB ─┘ │ Hesapla │ │ Snowflake │
└──────────┘ └──────────────────┘
ETL vs ELT
| Yaklaşım | Dönüşüm | Ne Zaman? | |-----------|---------|-----------| | ETL | Yüklemeden önce | Eski sistemler, hassas veri | | ELT | Yükledikten sonra | Modern data warehouse, büyük veri |
Batch vs Streaming
Batch Processing
Verileri belirli aralıklarla toplu işleme:
Her gece 02:00'de:
→ Günün tüm siparişlerini çek
→ Dönüştür ve temizle
→ Data warehouse'a yükle
→ Raporları güncelle
Stream Processing
Verileri gerçek zamanlı, anlık işleme:
Her sipariş geldiğinde:
→ Kafka'dan oku
→ Anında dönüştür
→ Anında yükle
→ Dashboard güncel
| Özellik | Batch | Streaming | |---------|-------|-----------| | Gecikme | Dakikalar-saatler | Milisaniyeler | | Karmaşıklık | Düşük | Yüksek | | Veri hacmi | Çok büyük | Sürekli akış | | Kullanım | Raporlama, ML eğitimi | Gerçek zamanlı dashboard |
Basit ETL Örneği (Python)
import pandas as pd
from sqlalchemy import create_engine
# Extract
orders_df = pd.read_sql("SELECT * FROM orders WHERE date = CURRENT_DATE", source_db)
products_df = pd.read_csv("products.csv")
# Transform
merged = orders_df.merge(products_df, on='product_id')
merged['revenue'] = merged['quantity'] * merged['price']
merged['order_date'] = pd.to_datetime(merged['order_date'])
merged = merged.dropna(subset=['customer_id'])
# Load
target_engine = create_engine('postgresql://warehouse:5432/analytics')
merged.to_sql('daily_revenue', target_engine, if_exists='append', index=False)
Apache Airflow ile Orkestrasyon
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
'daily_sales_pipeline',
schedule_interval='0 2 * * *', # Her gece 02:00
start_date=datetime(2026, 1, 1),
catchup=False,
)
extract_task = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag,
)
load_task = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse,
dag=dag,
)
extract_task >> transform_task >> load_task
Data Pipeline Araçları
| Araç | Tür | Öne Çıkan | |------|-----|-----------| | Apache Airflow | Orkestrasyon | DAG tabanlı, Python | | Apache Kafka | Streaming | Yüksek throughput | | Apache Spark | İşleme | Batch + streaming | | dbt | Transform | SQL tabanlı, modern | | Prefect | Orkestrasyon | Modern Airflow alternatifi | | Fivetran | Extract/Load | SaaS, 300+ connector |
Veri Kalitesi
Kontroller:
✓ Null değer kontrolü → customer_id boş mu?
✓ Tip kontrolü → fiyat sayısal mı?
✓ Aralık kontrolü → yaş 0-150 arasında mı?
✓ Benzersizlik kontrolü → duplicate kayıt var mı?
✓ Referans kontrolü → product_id mevcut mu?
Best Practices
- Idempotent pipeline — Aynı pipeline'ı tekrar çalıştırmak aynı sonucu vermeli
- Incremental load — Her seferinde tüm veriyi değil, sadece değişenleri yükleyin
- Schema evolution — Şema değişikliklerine hazır olun
- Error handling — Hata durumunda retry, dead letter queue
- Data lineage — Verinin nereden geldiğini takip edin
- Monitoring — Pipeline süresini, hata sayısını, veri hacmini izleyin
Sonuç
Data Pipeline ve ETL, veriye dayalı karar almanın altyapısıdır. Batch işleme günlük raporlar için, streaming ise gerçek zamanlı analitik için uygundur. Doğru araç ve strateji ile güvenilir, ölçeklenebilir veri akışları kurun.
Data pipeline ve veri mühendisliğini LabLudus platformunda öğrenin.