ما هو مهندس البيانات؟

مهندس البيانات

مهندس البيانات هو متخصص يصمم ويبني أنظمة جمع البيانات وتخزينها ومعالجتها، مما يمكن الشركات من تحليل كميات كبيرة من البيانات بكفاءة.

تخزين البيانات

تصميم أنظمة تخزين البيانات بكفاءة

معالجة البيانات

تحويل وتنظيف البيانات للتحليل

ETL Pipelines

بناء خطوط أنابيب استخراج وتحويل وتحميل

معالجة الوقت الفعلي

معالجة البيانات المباشرة والتدفقات

اللغات والأدوات المستخدمة

Python

اللغة الأساسية للتعامل مع البيانات وتحليل النماذج

SQL

لإدارة قواعد البيانات المحلية واستعلام البيانات

Java/Scala

(اختياري) عند العمل مع Hadoop و Apache Spark

Shell Scripting

لأتمتة المهام في أنظمة Unix/Linux

Docker

لتوفير بيئة موحدة وأتمتة النشر

Git/GitHub

لإدارة النسخ البرمجية والتعاون

مهارات مهندس البيانات

1

Python Programming

إتقان برمجة Python للتعامل مع البيانات

2

SQL & Databases

إدارة قواعد البيانات واستعلامات SQL

3

Apache Spark

معالجة البيانات الضخمة بسرعة

4

ETL Pipelines

بناء خطوط أنابيب استخراج وتحويل وتحميل

5

Data Warehousing

تصميم وتنفيذ مستودعات البيانات

6

Stream Processing

معالجة البيانات في الوقت الفعلي

خارطة التعلم خطوة بخطوة

1

الخطوة 1: تعلم Python

Python هي اللغة الأساسية للتعامل مع البيانات وتحليل النماذج باستخدام مكتبات مثل TensorFlow Privacy و Fairlearn

الأهمية:

أساس هندسة البيانات

مثال برنامج Python أساسي:

# 1. أساسيات Python لهندسة البيانات
print("مرحبا هندسة البيانات!")

def greet(name):
    return f"مرحبا، {name}!"

print(greet('Data Engineer'))

# 2. التعامل مع البيانات في Python
import pandas as pd
import numpy as np
from datetime import datetime

# تحميل البيانات
data = {
    'user_id': [1, 2, 3, 4, 5],
    'name': ['أحمد', 'سارة', 'خالد', 'نورة', 'محمد'],
    'age': [25, 32, 28, 45, 38],
    'city': ['الرياض', 'جدة', 'الدمام', 'الرياض', 'مكة'],
    'salary': [50000, 65000, 55000, 80000, 60000],
    'join_date': ['2023-01-15', '2022-08-20', '2023-03-10', '2021-11-05', '2022-05-30']
}

df = pd.DataFrame(data)
df['join_date'] = pd.to_datetime(df['join_date'])

# 3. تحليل البيانات
print("=== معلومات أساسية عن البيانات ===")
print(f"عدد الصفوف: {df.shape[0]}")
print(f"عدد الأعمدة: {df.shape[1]}")
print(f"الأعمدة: {list(df.columns)}")
print(f"القيم الفارغة: {df.isnull().sum().sum()}")

print("\n=== الإحصائيات الوصفية ===")
print(df.describe())

print("\n=== معلومات عن الأنواع ===")
print(df.info())

# 4. تحويل وتنظيف البيانات
def clean_data(df):
    """تنظيف وتحويل البيانات"""
    # نسخ البيانات
    cleaned_df = df.copy()
    
    # تعبئة القيم المفقودة
    cleaned_df['age'] = cleaned_df['age'].fillna(cleaned_df['age'].median())
    cleaned_df['salary'] = cleaned_df['salary'].fillna(cleaned_df['salary'].mean())
    
    # تحويل الأنواع
    cleaned_df['age'] = cleaned_df['age'].astype(int)
    
    # إضافة أعمدة محسوبة
    cleaned_df['years_of_service'] = (datetime.now() - cleaned_df['join_date']).dt.days // 365
    cleaned_df['age_group'] = pd.cut(cleaned_df['age'], 
                                     bins=[0, 30, 40, 50, 100],
                                     labels=['شاب', 'شباب', 'وسط', 'كبير'])
    
    # إزالة القيم المتطرفة
    Q1 = cleaned_df['salary'].quantile(0.25)
    Q3 = cleaned_df['salary'].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    cleaned_df = cleaned_df[(cleaned_df['salary'] >= lower_bound) & 
                           (cleaned_df['salary'] <= upper_bound)]
    
    return cleaned_df

# 5. تجميع البيانات
cleaned_data = clean_data(df)
print("\n=== البيانات بعد التنظيف ===")
print(cleaned_data)

# تجميع حسب المدينة
city_stats = cleaned_data.groupby('city').agg({
    'user_id': 'count',
    'age': ['mean', 'min', 'max'],
    'salary': ['mean', 'sum', 'std']
}).round(2)

print("\n=== إحصائيات حسب المدينة ===")
print(city_stats)

# 6. التعامل مع الملفات
import json
import csv
import os

# حفظ البيانات بتنسيقات مختلفة
cleaned_data.to_csv('cleaned_users.csv', index=False, encoding='utf-8')
cleaned_data.to_json('cleaned_users.json', orient='records', force_ascii=False)
cleaned_data.to_parquet('cleaned_users.parquet', index=False)

print("\n=== حفظ البيانات ===")
print(f"CSV حجم الملف: {os.path.getsize('cleaned_users.csv')} بايت")
print(f"JSON حجم الملف: {os.path.getsize('cleaned_users.json')} بايت")
print(f"Parquet حجم الملف: {os.path.getsize('cleaned_users.parquet')} بايت")

# 7. APIs والتعامل مع الشبكة
import requests
from typing import Dict, List, Optional

class DataCollector:
    """فئة لجمع البيانات من APIs"""
    
    def __init__(self, base_url: str, api_key: Optional[str] = None):
        self.base_url = base_url
        self.api_key = api_key
        self.session = requests.Session()
        
        if api_key:
            self.session.headers.update({
                'Authorization': f'Bearer {api_key}',
                'Content-Type': 'application/json'
            })
    
    def fetch_data(self, endpoint: str, params: Dict = None) -> Dict:
        """جلب البيانات من API"""
        try:
            url = f"{self.base_url}/{endpoint}"
            response = self.session.get(url, params=params, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"خطأ في جلب البيانات: {e}")
            return {}
    
    def fetch_paginated_data(self, endpoint: str, page_size: int = 100) -> List[Dict]:
        """جلب بيانات متعددة الصفحات"""
        all_data = []
        page = 1
        
        while True:
            params = {'page': page, 'page_size': page_size}
            data = self.fetch_data(endpoint, params)
            
            if not data or 'results' not in data:
                break
            
            all_data.extend(data['results'])
            
            if 'next' not in data or not data['next']:
                break
            
            page += 1
        
        return all_data
    
    def save_to_file(self, data: List[Dict], filename: str, format: str = 'json'):
        """حفظ البيانات إلى ملف"""
        if format == 'json':
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
        elif format == 'csv' and data:
            keys = data[0].keys()
            with open(filename, 'w', newline='', encoding='utf-8') as f:
                writer = csv.DictWriter(f, fieldnames=keys)
                writer.writeheader()
                writer.writerows(data)
        
        print(f"تم حفظ البيانات في {filename}")

# 8. تعدد المهام (Multiprocessing)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def process_chunk(chunk: pd.DataFrame) -> pd.DataFrame:
    """معالجة جزء من البيانات"""
    time.sleep(0.1)  # محاكاة عملية معالجة
    chunk['processed'] = True
    return chunk

def parallel_processing(df: pd.DataFrame, num_workers: int = 4) -> pd.DataFrame:
    """معالجة متوازية للبيانات"""
    # تقسيم البيانات
    chunk_size = len(df) // num_workers + 1
    chunks = [df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)]
    
    # معالجة متوازية
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        results = list(executor.map(process_chunk, chunks))
    
    # دمج النتائج
    return pd.concat(results, ignore_index=True)

# 9. تسجيل السجلات (Logging)
import logging
from logging.handlers import RotatingFileHandler

def setup_logger(name: str, log_file: str = 'data_pipeline.log') -> logging.Logger:
    """إعداد نظام تسجيل السجلات"""
    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)
    
    # إنشاء handlers
    file_handler = RotatingFileHandler(
        log_file, maxBytes=10*1024*1024, backupCount=5
    )
    console_handler = logging.StreamHandler()
    
    # إنشاء formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # إضافة handlers
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

# استخدام الـ logger
logger = setup_logger('data_engineer')
logger.info("بدء معالجة البيانات...")

try:
    # معالجة البيانات
    processed_data = parallel_processing(cleaned_data)
    logger.info(f"تمت معالجة {len(processed_data)} سجل بنجاح")
except Exception as e:
    logger.error(f"حدث خطأ في معالجة البيانات: {e}")

# 10. اختبار البيانات
import unittest

class TestDataPipeline(unittest.TestCase):
    """اختبار خط أنابيب البيانات"""
    
    def setUp(self):
        self.data = df.copy()
    
    def test_data_cleaning(self):
        """اختبار تنقية البيانات"""
        cleaned = clean_data(self.data)
        self.assertFalse(cleaned.isnull().any().any())
        self.assertEqual(len(cleaned), len(self.data))
    
    def test_data_types(self):
        """اختبار أنواع البيانات"""
        cleaned = clean_data(self.data)
        self.assertEqual(cleaned['age'].dtype, int)
        self.assertTrue(pd.api.types.is_datetime64_any_dtype(cleaned['join_date']))
    
    def test_calculated_columns(self):
        """اختبار الأعمدة المحسوبة"""
        cleaned = clean_data(self.data)
        self.assertIn('age_group', cleaned.columns)
        self.assertIn('years_of_service', cleaned.columns)

# تشغيل الاختبارات
if __name__ == '__main__':
    unittest.main()

# 11. نماذج التعلم الآلي الأساسية
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score

def build_salary_model(df: pd.DataFrame) -> Dict:
    """بناء نموذج للتنبؤ بالراتب"""
    # تحضير البيانات
    features = pd.get_dummies(df[['age', 'city']], columns=['city'], drop_first=True)
    target = df['salary']
    
    # تقسيم البيانات
    X_train, X_test, y_train, y_test = train_test_split(
        features, target, test_size=0.2, random_state=42
    )
    
    # بناء النموذج
    model = LinearRegression()
    model.fit(X_train, y_train)
    
    # التنبؤ والتقييم
    y_pred = model.predict(X_test)
    
    results = {
        'model': model,
        'mse': mean_squared_error(y_test, y_pred),
        'r2': r2_score(y_test, y_pred),
        'coefficients': dict(zip(features.columns, model.coef_))
    }
    
    return results

# استخدام النموذج
if len(cleaned_data) > 1:
    model_results = build_salary_model(cleaned_data)
    print(f"\n=== نتائج نموذج التنبؤ بالراتب ===")
    print(f"MSE: {model_results['mse']:.2f}")
    print(f"R²: {model_results['r2']:.2f}")
    print("المعاملات:", model_results['coefficients'])

# 12. الإنتاجية والتحسين
import timeit
import tracemalloc

def profile_function(func, *args, **kwargs):
    """قياس أداء الدالة"""
    tracemalloc.start()
    
    start_time = timeit.default_timer()
    result = func(*args, **kwargs)
    end_time = timeit.default_timer()
    
    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    
    return {
        'result': result,
        'time': end_time - start_time,
        'memory_current': current / 10**6,
        'memory_peak': peak / 10**6
    }

# قياس أداء التنظيف
profile_result = profile_function(clean_data, df)
print(f"\n=== قياس الأداء ===")
print(f"الوقت: {profile_result['time']:.4f} ثانية")
print(f"الذاكرة الحالية: {profile_result['memory_current']:.2f} MB")
print(f"الذاكرة القصوى: {profile_result['memory_peak']:.2f} MB")
                        
2

الخطوة 2: تعلم SQL

SQL للإدارة قواعد البيانات المحلية واستعلام البيانات من قواعد البيانات العلائقية

الأهمية:

لإدارة قواعد البيانات المحلية باستخدام Room Database

مثال استعلامات SQL متقدمة:

-- 1. إنشاء وتكوين قواعد البيانات
CREATE DATABASE IF NOT EXISTS ecommerce_db;
USE ecommerce_db;

-- 2. إنشاء الجداول
CREATE TABLE IF NOT EXISTS customers (
    customer_id INT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    age INT CHECK (age >= 18),
    city VARCHAR(50),
    country VARCHAR(50) DEFAULT 'السعودية',
    registration_date DATE DEFAULT (CURRENT_DATE),
    total_spent DECIMAL(15,2) DEFAULT 0.00,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_city (city),
    INDEX idx_registration_date (registration_date)
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

CREATE TABLE IF NOT EXISTS products (
    product_id INT PRIMARY KEY AUTO_INCREMENT,
    product_name VARCHAR(200) NOT NULL,
    category VARCHAR(50),
    subcategory VARCHAR(50),
    brand VARCHAR(50),
    price DECIMAL(10,2) NOT NULL CHECK (price > 0),
    cost DECIMAL(10,2) NOT NULL,
    stock_quantity INT DEFAULT 0 CHECK (stock_quantity >= 0),
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_category (category),
    INDEX idx_brand (brand),
    INDEX idx_price (price)
);

CREATE TABLE IF NOT EXISTS orders (
    order_id INT PRIMARY KEY AUTO_INCREMENT,
    customer_id INT NOT NULL,
    order_date DATE NOT NULL,
    order_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    status ENUM('pending', 'processing', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
    shipping_city VARCHAR(50),
    shipping_country VARCHAR(50),
    total_amount DECIMAL(15,2) NOT NULL,
    discount_amount DECIMAL(10,2) DEFAULT 0.00,
    net_amount DECIMAL(15,2) GENERATED ALWAYS AS (total_amount - discount_amount) STORED,
    payment_method VARCHAR(20),
    notes TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (customer_id) REFERENCES customers(customer_id) ON DELETE CASCADE,
    INDEX idx_order_date (order_date),
    INDEX idx_status (status),
    INDEX idx_customer_order (customer_id, order_date)
);

CREATE TABLE IF NOT EXISTS order_items (
    order_item_id INT PRIMARY KEY AUTO_INCREMENT,
    order_id INT NOT NULL,
    product_id INT NOT NULL,
    quantity INT NOT NULL CHECK (quantity > 0),
    unit_price DECIMAL(10,2) NOT NULL,
    total_price DECIMAL(15,2) GENERATED ALWAYS AS (quantity * unit_price) STORED,
    discount_percent DECIMAL(5,2) DEFAULT 0.00,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (order_id) REFERENCES orders(order_id) ON DELETE CASCADE,
    FOREIGN KEY (product_id) REFERENCES products(product_id) ON DELETE RESTRICT,
    INDEX idx_order_product (order_id, product_id),
    UNIQUE KEY unique_order_product (order_id, product_id)
);

-- 3. إدراج بيانات عينة
INSERT INTO customers (name, email, age, city, registration_date, total_spent) VALUES
('أحمد محمد', 'ahmed@example.com', 28, 'الرياض', '2023-01-15', 15000.00),
('سارة العتيبي', 'sara@example.com', 32, 'جدة', '2022-08-20', 25000.50),
('خالد السليم', 'khaled@example.com', 45, 'الرياض', '2021-11-05', 85000.75),
('نورة القحطاني', 'noura@example.com', 35, 'الدمام', '2023-03-10', 32000.25),
('محمد الحربي', 'mohammed@example.com', 29, 'مكة', '2022-05-30', 18000.00);

INSERT INTO products (product_name, category, subcategory, brand, price, cost, stock_quantity) VALUES
('لابتوب ديل', 'إلكترونيات', 'حواسيب', 'ديل', 3500.00, 2800.00, 50),
('هاتف سامسونج', 'إلكترونيات', 'هواتف', 'سامسونج', 2800.00, 2200.00, 100),
('تيشيرت قطني', 'ملابس', 'ملابس رجالية', 'نايكي', 150.00, 80.00, 200),
('عطر شانيل', 'عطور', 'عطور نسائية', 'شانيل', 500.00, 300.00, 30),
('كتاب تعلم Python', 'كتب', 'تقنية', 'دار النشر', 120.00, 70.00, 150);

INSERT INTO orders (customer_id, order_date, status, shipping_city, total_amount, discount_amount, payment_method) VALUES
(1, '2024-01-10', 'delivered', 'الرياض', 3650.00, 150.00, 'credit_card'),
(2, '2024-01-12', 'shipped', 'جدة', 2900.00, 100.00, 'paypal'),
(3, '2024-01-15', 'processing', 'الرياض', 1200.00, 50.00, 'bank_transfer'),
(1, '2024-01-18', 'pending', 'الرياض', 500.00, 0.00, 'credit_card'),
(4, '2024-01-20', 'delivered', 'الدمام', 1800.00, 100.00, 'paypal');

INSERT INTO order_items (order_id, product_id, quantity, unit_price, discount_percent) VALUES
(1, 1, 1, 3500.00, 5.00),
(1, 5, 2, 120.00, 0.00),
(2, 2, 1, 2800.00, 0.00),
(2, 3, 2, 150.00, 10.00),
(3, 4, 2, 500.00, 5.00),
(4, 5, 1, 120.00, 0.00),
(4, 3, 2, 150.00, 0.00),
(5, 2, 1, 2800.00, 0.00);

-- 4. استعلامات أساسية
-- استخراج العملاء فوق 30 سنة
SELECT customer_id, name, age, city, total_spent
FROM customers
WHERE age > 30
ORDER BY total_spent DESC;

-- عد العملاء حسب المدينة
SELECT 
    city,
    COUNT(*) AS customer_count,
    AVG(age) AS average_age,
    SUM(total_spent) AS total_spending,
    AVG(total_spent) AS avg_spending
FROM customers
GROUP BY city
HAVING COUNT(*) > 1
ORDER BY total_spending DESC;

-- 5. استعلامات متقدمة مع JOIN
SELECT 
    c.customer_id,
    c.name,
    c.city,
    o.order_id,
    o.order_date,
    o.status,
    o.total_amount,
    o.net_amount,
    oi.quantity,
    p.product_name,
    p.category
FROM customers c
INNER JOIN orders o ON c.customer_id = o.customer_id
INNER JOIN order_items oi ON o.order_id = oi.order_id
INNER JOIN products p ON oi.product_id = p.product_id
WHERE o.order_date >= '2024-01-01'
ORDER BY o.order_date DESC, o.total_amount DESC;

-- 6. الاستعلامات التجميعية المتقدمة
WITH customer_orders AS (
    SELECT 
        c.customer_id,
        c.name,
        c.city,
        COUNT(DISTINCT o.order_id) AS order_count,
        SUM(o.total_amount) AS total_spent,
        AVG(o.total_amount) AS avg_order_value,
        MAX(o.order_date) AS last_order_date,
        MIN(o.order_date) AS first_order_date
    FROM customers c
    LEFT JOIN orders o ON c.customer_id = o.customer_id
    GROUP BY c.customer_id, c.name, c.city
),
customer_segments AS (
    SELECT 
        *,
        CASE 
            WHEN total_spent >= 50000 THEN 'Platinum'
            WHEN total_spent >= 20000 THEN 'Gold'
            WHEN total_spent >= 5000 THEN 'Silver'
            ELSE 'Bronze'
        END AS customer_segment,
        CASE 
            WHEN last_order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) THEN 'Active'
            WHEN last_order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY) THEN 'At Risk'
            ELSE 'Inactive'
        END AS engagement_status
    FROM customer_orders
)
SELECT 
    customer_segment,
    engagement_status,
    COUNT(*) AS customer_count,
    AVG(total_spent) AS avg_total_spent,
    AVG(order_count) AS avg_orders,
    SUM(total_spent) AS segment_revenue
FROM customer_segments
GROUP BY customer_segment, engagement_status
ORDER BY segment_revenue DESC;

-- 7. وظائف النافذة (Window Functions)
SELECT 
    customer_id,
    name,
    city,
    total_spent,
    RANK() OVER (PARTITION BY city ORDER BY total_spent DESC) AS rank_in_city,
    ROUND(PERCENT_RANK() OVER (ORDER BY total_spent) * 100, 2) AS percentile_rank,
    SUM(total_spent) OVER (PARTITION BY city) AS city_total_spent,
    AVG(total_spent) OVER (PARTITION BY city) AS city_avg_spent,
    total_spent - AVG(total_spent) OVER (PARTITION BY city) AS diff_from_city_avg
FROM customers
WHERE city IS NOT NULL
ORDER BY city, total_spent DESC;

-- 8. الاستعلامات العودية (Recursive CTEs)
WITH RECURSIVE date_series AS (
    SELECT 
        DATE('2024-01-01') AS date
    UNION ALL
    SELECT 
        date + INTERVAL 1 DAY
    FROM date_series
    WHERE date < '2024-01-31'
),
daily_sales AS (
    SELECT 
        ds.date,
        COALESCE(COUNT(DISTINCT o.order_id), 0) AS order_count,
        COALESCE(SUM(o.total_amount), 0) AS daily_revenue,
        COALESCE(SUM(o.net_amount), 0) AS daily_net_revenue
    FROM date_series ds
    LEFT JOIN orders o ON ds.date = o.order_date
    GROUP BY ds.date
)
SELECT 
    date,
    order_count,
    daily_revenue,
    daily_net_revenue,
    SUM(daily_revenue) OVER (ORDER BY date) AS cumulative_revenue,
    AVG(daily_revenue) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS weekly_avg_revenue,
    LAG(daily_revenue, 1) OVER (ORDER BY date) AS prev_day_revenue,
    ROUND(((daily_revenue - LAG(daily_revenue, 1) OVER (ORDER BY date)) / 
           LAG(daily_revenue, 1) OVER (ORDER BY date)) * 100, 2) AS growth_percentage
FROM daily_sales
ORDER BY date;

-- 9. الاستعلامات المعقدة مع Subqueries
SELECT 
    p.product_id,
    p.product_name,
    p.category,
    p.price,
    p.stock_quantity,
    oi.total_quantity_sold,
    oi.total_revenue,
    oi.avg_order_quantity,
    CASE 
        WHEN p.stock_quantity = 0 THEN 'نفذت الكمية'
        WHEN p.stock_quantity <= 10 THEN 'كمية قليلة'
        WHEN p.stock_quantity <= 50 THEN 'كمية متوسطة'
        ELSE 'كمية جيدة'
    END AS stock_status
FROM products p
LEFT JOIN (
    SELECT 
        product_id,
        SUM(quantity) AS total_quantity_sold,
        SUM(quantity * unit_price) AS total_revenue,
        AVG(quantity) AS avg_order_quantity
    FROM order_items
    GROUP BY product_id
) oi ON p.product_id = oi.product_id
WHERE p.is_active = TRUE
ORDER BY oi.total_revenue DESC NULLS LAST;

-- 10. إدارة المعاملات (Transactions)
START TRANSACTION;

-- تخفيض سعر المنتجات في فئة معينة
UPDATE products 
SET price = price * 0.9  -- تخفيض 10%
WHERE category = 'إلكترونيات'
AND brand = 'سامسونج';

-- تحديث المخزون
UPDATE products p
JOIN (
    SELECT 
        product_id,
        SUM(quantity) AS total_ordered
    FROM order_items oi
    JOIN orders o ON oi.order_id = o.order_id
    WHERE o.order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
    GROUP BY product_id
) recent_sales ON p.product_id = recent_sales.product_id
SET p.stock_quantity = p.stock_quantity - recent_sales.total_ordered
WHERE p.stock_quantity >= recent_sales.total_ordered;

-- إضافة سجل مراجعة
INSERT INTO product_audit (product_id, action_type, old_price, new_price, changed_by, change_date)
SELECT 
    product_id,
    'PRICE_UPDATE',
    price / 0.9,  -- السعر القديم
    price,        -- السعر الجديد
    CURRENT_USER(),
    CURRENT_TIMESTAMP()
FROM products 
WHERE category = 'إلكترونيات'
AND brand = 'سامسونج';

COMMIT;

-- 11. إنشاء Views و Stored Procedures
CREATE OR REPLACE VIEW customer_summary_view AS
SELECT 
    c.customer_id,
    c.name,
    c.email,
    c.city,
    c.country,
    c.registration_date,
    c.total_spent,
    COALESCE(o.order_count, 0) AS order_count,
    COALESCE(o.last_order_date, c.registration_date) AS last_order_date,
    CASE 
        WHEN o.last_order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) THEN 'نشط'
        WHEN o.last_order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY) THEN 'شبه نشط'
        ELSE 'غير نشط'
    END AS activity_status
FROM customers c
LEFT JOIN (
    SELECT 
        customer_id,
        COUNT(*) AS order_count,
        MAX(order_date) AS last_order_date
    FROM orders
    GROUP BY customer_id
) o ON c.customer_id = o.customer_id;

-- Stored Procedure لحساب إحصائيات العملاء
DELIMITER //
CREATE PROCEDURE CalculateCustomerStats(
    IN p_city VARCHAR(50),
    OUT p_total_customers INT,
    OUT p_avg_spending DECIMAL(15,2),
    OUT p_total_revenue DECIMAL(15,2)
)
BEGIN
    -- حساب عدد العملاء
    SELECT COUNT(*), AVG(total_spent), SUM(total_spent)
    INTO p_total_customers, p_avg_spending, p_total_revenue
    FROM customers
    WHERE (p_city IS NULL OR city = p_city);
    
    -- تسجيل العملية
    INSERT INTO stats_log (operation_type, parameters, result_count, executed_by)
    VALUES ('CALCULATE_CUSTOMER_STATS', p_city, p_total_customers, CURRENT_USER());
END //
DELIMITER ;

-- استدعاء الـ Stored Procedure
CALL CalculateCustomerStats('الرياض', @total, @avg, @revenue);
SELECT @total AS total_customers, @avg AS avg_spending, @revenue AS total_revenue;

-- 12. التحسين والـ Explain Plans
EXPLAIN ANALYZE
SELECT 
    c.customer_id,
    c.name,
    o.order_id,
    o.order_date,
    o.total_amount,
    oi.quantity,
    p.product_name
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
WHERE c.city = 'الرياض'
AND o.order_date >= '2024-01-01'
ORDER BY o.total_amount DESC
LIMIT 100;

-- 13. النسخ الاحتياطي والاستعادة
-- نسخ احتياطي للبيانات
SELECT * INTO OUTFILE '/tmp/customers_backup.csv'
FIELDS TERMINATED BY ',' 
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
FROM customers;

-- استعادة البيانات
LOAD DATA INFILE '/tmp/customers_backup.csv'
INTO TABLE customers
FIELDS TERMINATED BY ',' 
ENCLOSED BY '"'
LINES TERMINATED BY '\n';

-- 14. مراقبة الأداء
SHOW PROCESSLIST;
SHOW STATUS LIKE 'Threads_connected';
SHOW ENGINE INNODB STATUS;

-- 15. تنظيف البيانات
-- إزالة العملاء الذين لم يقوموا بأي طلب منذ أكثر من سنة
DELETE FROM customers 
WHERE customer_id IN (
    SELECT c.customer_id
    FROM customers c
    LEFT JOIN orders o ON c.customer_id = o.customer_id
    GROUP BY c.customer_id
    HAVING MAX(o.order_date) < DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR)
    OR MAX(o.order_date) IS NULL
);

-- تحديث الإحصائيات
ANALYZE TABLE customers, orders, products, order_items;
OPTIMIZE TABLE customers, orders, products, order_items;
                        
3

الخطوة 3: تعلم Apache Spark

Apache Spark لمعالجة كميات ضخمة من البيانات بسرعة وكفاءة. يدعم معالجة البيانات الموزعة باستخدام RDDs (Resilient Distributed Datasets)

الأهمية:

ضروري لمعالجة البيانات الضخمة بشكل سريع وفعال

الأدوات:

Spark Python (واجهة PySpark)

مثال عملي معالجة بيانات باستخدام PySpark:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# 1. إنشاء جلسة Spark
spark = SparkSession.builder \
    .appName("Data Engineering Pipeline") \
    .config("spark.sql.shuffle.partitions", "100") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# 2. تحميل البيانات من مصادر مختلفة
# من ملفات CSV
customers_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .csv("hdfs://localhost:9000/data/customers/*.csv")

# من قاعدة بيانات JDBC
jdbc_url = "jdbc:mysql://localhost:3306/ecommerce_db"
properties = {
    "user": "root",
    "password": "password",
    "driver": "com.mysql.jdbc.Driver"
}

orders_df = spark.read \
    .jdbc(url=jdbc_url, table="orders", properties=properties)

# من Hive
products_df = spark.sql("SELECT * FROM ecommerce_db.products")

# 3. تحويلات ETL الأساسية
class ETLPipeline:
    """فئة لبناء خط أنابيب ETL"""
    
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def extract_data(self, source_config):
        """استخراج البيانات من مصادر مختلفة"""
        data_sources = []
        
        # من ملفات
        if 'csv_files' in source_config:
            df = self.spark.read \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .csv(source_config['csv_files'])
            data_sources.append(df)
        
        # من قاعدة بيانات
        if 'jdbc' in source_config:
            df = self.spark.read \
                .jdbc(url=source_config['jdbc']['url'],
                     table=source_config['jdbc']['table'],
                     properties=source_config['jdbc']['properties'])
            data_sources.append(df)
        
        # من Kafka
        if 'kafka' in source_config:
            df = self.spark.readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", source_config['kafka']['bootstrap_servers']) \
                .option("subscribe", source_config['kafka']['topic']) \
                .load()
            data_sources.append(df)
        
        return data_sources
    
    def transform_data(self, df, transformations):
        """تحويل البيانات"""
        transformed_df = df
        
        for transform in transformations:
            if transform['type'] == 'filter':
                transformed_df = transformed_df.filter(transform['condition'])
            
            elif transform['type'] == 'select':
                transformed_df = transformed_df.select(transform['columns'])
            
            elif transform['type'] == 'group_by':
                transformed_df = transformed_df.groupBy(transform['columns']) \
                    .agg(*transform['aggregations'])
            
            elif transform['type'] == 'join':
                transformed_df = transformed_df.join(
                    transform['other_df'],
                    transform['join_condition'],
                    transform['join_type']
                )
            
            elif transform['type'] == 'window_function':
                window_spec = Window.partitionBy(transform['partition_cols']) \
                    .orderBy(transform['order_cols'])
                for col_name, func in transform['functions'].items():
                    transformed_df = transformed_df.withColumn(
                        col_name, func().over(window_spec)
                    )
        
        return transformed_df
    
    def load_data(self, df, destination_config):
        """تحميل البيانات إلى وجهات مختلفة"""
        
        # إلى HDFS
        if destination_config.get('hdfs'):
            df.write \
                .mode(destination_config['hdfs']['mode']) \
                .option("header", "true") \
                .csv(destination_config['hdfs']['path'])
        
        # إلى قاعدة بيانات
        if destination_config.get('jdbc'):
            df.write \
                .mode(destination_config['jdbc']['mode']) \
                .jdbc(url=destination_config['jdbc']['url'],
                     table=destination_config['jdbc']['table'],
                     properties=destination_config['jdbc']['properties'])
        
        # إلى Hive
        if destination_config.get('hive'):
            df.write \
                .mode(destination_config['hive']['mode']) \
                .saveAsTable(destination_config['hive']['table_name'])
        
        # إلى Kafka
        if destination_config.get('kafka'):
            df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
                .write \
                .format("kafka") \
                .option("kafka.bootstrap.servers", destination_config['kafka']['bootstrap_servers']) \
                .option("topic", destination_config['kafka']['topic']) \
                .save()
    
    def run_pipeline(self, source_config, transformations, destination_config):
        """تشغيل خط الأنابيب الكامل"""
        print("بدء خط أنابيب ETL...")
        
        # استخراج
        data_sources = self.extract_data(source_config)
        print(f"تم استخراج {len(data_sources)} مصدر بيانات")
        
        # تحويل
        for i, df in enumerate(data_sources):
            print(f"تحويل مصدر البيانات {i+1}...")
            transformed_df = self.transform_data(df, transformations)
            
            # تحميل
            print(f"تحميل مصدر البيانات {i+1}...")
            self.load_data(transformed_df, destination_config)
        
        print("اكتمل خط أنابيب ETL!")

# 4. مثال عملي لخط أنابيب ETL
# تكوين المصادر
source_config = {
    'csv_files': 'hdfs://localhost:9000/data/raw/*.csv',
    'jdbc': {
        'url': 'jdbc:mysql://localhost:3306/ecommerce_db',
        'table': 'orders',
        'properties': {
            'user': 'root',
            'password': 'password',
            'driver': 'com.mysql.jdbc.Driver'
        }
    }
}

# تكوين التحويلات
transformations = [
    {
        'type': 'filter',
        'condition': col('total_amount') > 1000
    },
    {
        'type': 'select',
        'columns': ['customer_id', 'order_date', 'total_amount', 'status']
    },
    {
        'type': 'group_by',
        'columns': ['customer_id'],
        'aggregations': [
            count('*').alias('order_count'),
            sum('total_amount').alias('total_spent'),
            avg('total_amount').alias('avg_order_value')
        ]
    }
]

# تكوين الوجهات
destination_config = {
    'hdfs': {
        'path': 'hdfs://localhost:9000/data/processed/customer_summary',
        'mode': 'overwrite'
    },
    'jdbc': {
        'url': 'jdbc:mysql://localhost:3306/ecommerce_db',
        'table': 'customer_summary',
        'properties': {
            'user': 'root',
            'password': 'password',
            'driver': 'com.mysql.jdbc.Driver'
        },
        'mode': 'overwrite'
    }
}

# تشغيل خط الأنابيب
etl_pipeline = ETLPipeline(spark)
etl_pipeline.run_pipeline(source_config, transformations, destination_config)

# 5. معالجة البيانات الموزعة (Distributed Processing)
class DistributedDataProcessor:
    """معالج بيانات موزع"""
    
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def process_large_dataset(self, input_path, output_path):
        """معالجة مجموعة بيانات كبيرة"""
        # قراءة البيانات
        df = self.spark.read.parquet(input_path)
        
        # إعادة التقسيم الأمثل
        optimal_partitions = max(1, df.rdd.getNumPartitions() // 10)
        df = df.repartition(optimal_partitions, "customer_id")
        
        # معالجة متقدمة
        processed_df = df.transform(self._apply_transformations)
        
        # التحسينات
        processed_df.cache()
        processed_df.count()  # تفعيل التخزين المؤقت
        
        # حفظ النتائج
        processed_df.write \
            .mode("overwrite") \
            .partitionBy("year", "month") \
            .parquet(output_path)
        
        return processed_df
    
    def _apply_transformations(self, df):
        """تطبيق التحويلات على البيانات"""
        # تنظيف البيانات
        df = df.dropna(subset=["customer_id", "order_date"])
        
        # إضافة أعمدة محسوبة
        df = df.withColumn("order_year", year(col("order_date"))) \
               .withColumn("order_month", month(col("order_date"))) \
               .withColumn("order_quarter", quarter(col("order_date"))) \
               .withColumn("order_day", dayofmonth(col("order_date"))) \
               .withColumn("order_day_of_week", dayofweek(col("order_date")))
        
        # تجميع البيانات
        result = df.groupBy("customer_id", "order_year", "order_month") \
            .agg(
                count("*").alias("monthly_order_count"),
                sum("total_amount").alias("monthly_spent"),
                avg("total_amount").alias("avg_order_value"),
                max("order_date").alias("last_order_date"),
                min("order_date").alias("first_order_date"),
                collect_list("order_id").alias("order_ids")
            )
        
        return result
    
    def calculate_customer_lifetime_value(self, df):
        """حساب قيمة العميل مدى الحياة (CLV)"""
        
        # تعريف نافذة زمنية
        window_spec = Window.partitionBy("customer_id").orderBy("order_date")
        
        clv_df = df.withColumn("order_sequence", row_number().over(window_spec)) \
            .withColumn("days_between_orders", 
                       datediff(col("order_date"), 
                               lag("order_date", 1).over(window_spec))) \
            .withColumn("cumulative_spent", 
                       sum("total_amount").over(window_spec.rowsBetween(Window.unboundedPreceding, 0))) \
            .withColumn("avg_order_frequency", 
                       avg("days_between_orders").over(Window.partitionBy("customer_id"))) \
            .withColumn("predicted_future_orders", 
                       when(col("avg_order_frequency").isNotNull(),
                           365 / col("avg_order_frequency")).otherwise(0)) \
            .withColumn("predicted_future_value", 
                       col("predicted_future_orders") * avg("total_amount").over(Window.partitionBy("customer_id")))
        
        return clv_df

# 6. معالجة البيانات في الوقت الفعلي (Stream Processing)
from pyspark.sql.streaming import StreamingQuery

class RealTimeDataProcessor:
    """معالج بيانات في الوقت الفعلي"""
    
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def create_kafka_stream(self, bootstrap_servers, topics):
        """إنشاء تدفق من Kafka"""
        stream_df = self.spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", bootstrap_servers) \
            .option("subscribe", topics) \
            .option("startingOffsets", "latest") \
            .load()
        
        # تحويل البيانات
        from pyspark.sql.functions import from_json
        schema = self._get_order_schema()
        
        parsed_stream = stream_df \
            .select(from_json(col("value").cast("string"), schema).alias("data")) \
            .select("data.*")
        
        return parsed_stream
    
    def _get_order_schema(self):
        """الحصول على مخطط بيانات الطلبات"""
        return StructType([
            StructField("order_id", StringType()),
            StructField("customer_id", StringType()),
            StructField("order_timestamp", TimestampType()),
            StructField("total_amount", DoubleType()),
            StructField("items", ArrayType(
                StructType([
                    StructField("product_id", StringType()),
                    StructField("quantity", IntegerType()),
                    StructField("price", DoubleType())
                ])
            ))
        ])
    
    def process_real_time_orders(self, stream_df):
        """معالجة الطلبات في الوقت الفعلي"""
        
        # معالجة التدفق
        processed_stream = stream_df \
            .withWatermark("order_timestamp", "5 minutes") \
            .groupBy(
                window("order_timestamp", "1 hour"),
                "customer_id"
            ).agg(
                count("*").alias("hourly_orders"),
                sum("total_amount").alias("hourly_spent"),
                avg("total_amount").alias("avg_order_value")
            )
        
        return processed_stream
    
    def write_stream_to_sink(self, stream_df, sink_type, **kwargs):
        """كتابة التدفق إلى وجهة"""
        if sink_type == "console":
            query = stream_df.writeStream \
                .outputMode("complete") \
                .format("console") \
                .option("truncate", "false") \
                .start()
        
        elif sink_type == "kafka":
            query = stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
                .writeStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", kwargs["bootstrap_servers"]) \
                .option("topic", kwargs["topic"]) \
                .option("checkpointLocation", kwargs["checkpoint_location"]) \
                .start()
        
        elif sink_type == "parquet":
            query = stream_df.writeStream \
                .format("parquet") \
                .option("path", kwargs["path"]) \
                .option("checkpointLocation", kwargs["checkpoint_location"]) \
                .partitionBy(kwargs.get("partition_by", [])) \
                .start()
        
        return query

# 7. التحليل والتحسين
class SparkOptimizer:
    """محسن أداء Spark"""
    
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def analyze_query_plan(self, df):
        """تحليل خطة الاستعلام"""
        df.explain(extended=True)
        
        # عرض إحصائيات التحسين
        print("=== إحصائيات التحسين ===")
        print(f"عدد الأقسام: {df.rdd.getNumPartitions()}")
        print(f"تقدير عدد الصفوف: {df.count()}")
        
        return df
    
    def optimize_dataframe(self, df):
        """تحسين DataFrame"""
        # التحقق من عدد الأقسام
        current_partitions = df.rdd.getNumPartitions()
        if current_partitions > 200:
            print(f"تقليل الأقسام من {current_partitions} إلى 200")
            df = df.coalesce(200)
        
        # التحقق من حجم البيانات
        df.cache()
        row_count = df.count()
        print(f"عدد الصفوف: {row_count}")
        
        if row_count > 1000000:
            print("تطبيق Partitioning على البيانات الكبيرة")
            df = df.repartition(200, "customer_id")
        
        return df
    
    def monitor_performance(self, df, operation_name):
        """مراقبة أداء العملية"""
        import time
        
        start_time = time.time()
        start_memory = self.spark.sparkContext.getExecutorMemoryStatus()
        
        # تنفيذ العملية
        result = df.collect()
        
        end_time = time.time()
        end_memory = self.spark.sparkContext.getExecutorMemoryStatus()
        
        # حساب المقاييس
        execution_time = end_time - start_time
        memory_used = sum(end_memory.values()) - sum(start_memory.values())
        
        print(f"=== أداء {operation_name} ===")
        print(f"الوقت: {execution_time:.2f} ثانية")
        print(f"الذاكرة المستخدمة: {memory_used / (1024**2):.2f} MB")
        print(f"عدد الصفوف: {len(result)}")
        
        return result

# 8. التكامل مع أنظمة أخرى
class DataPipelineIntegrator:
    """مكامل خطوط أنابيب البيانات"""
    
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def integrate_with_airflow(self, dag_id, task_id, config):
        """التكامل مع Apache Airflow"""
        # إنشاء مهمة Airflow
        from airflow.operators.python import PythonOperator
        
        def spark_task(**kwargs):
            # تكوين Spark
            spark = SparkSession.builder \
                .appName(f"{dag_id}_{task_id}") \
                .config("spark.sql.shuffle.partitions", "100") \
                .getOrCreate()
            
            # تنفيذ المهمة
            result = self._execute_spark_job(spark, config)
            
            # إرجاع النتيجة إلى Airflow
            kwargs['ti'].xcom_push(key='spark_result', value=result)
            
            return result
        
        return PythonOperator(
            task_id=task_id,
            python_callable=spark_task,
            dag=kwargs.get('dag')
        )
    
    def _execute_spark_job(self, spark, config):
        """تنفيذ مهمة Spark"""
        # قراءة البيانات
        df = spark.read.format(config['input_format']) \
            .options(**config['input_options']) \
            .load(config['input_path'])
        
        # تطبيق التحويلات
        for transform in config['transformations']:
            df = df.transform(transform)
        
        # حفظ النتائج
        df.write.format(config['output_format']) \
            .options(**config['output_options']) \
            .mode(config['output_mode']) \
            .save(config['output_path'])
        
        return df.count()
    
    def integrate_with_kafka(self, bootstrap_servers, topic, schema):
        """التكامل مع Kafka"""
        # قراءة من Kafka
        df = self.spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", bootstrap_servers) \
            .option("subscribe", topic) \
            .load()
        
        # تحويل البيانات
        from pyspark.sql.functions import from_json
        parsed_df = df.select(
            from_json(col("value").cast("string"), schema).alias("data")
        ).select("data.*")
        
        return parsed_df

# 9. اختبار خطوط الأنابيب
import unittest
from pyspark.testing import assertDataFrameEqual

class TestETLPipeline(unittest.TestCase):
    """اختبار خط أنابيب ETL"""
    
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder \
            .appName("ETL Pipeline Tests") \
            .master("local[2]") \
            .getOrCreate()
    
    def test_data_extraction(self):
        """اختبار استخراج البيانات"""
        # إنشاء بيانات تجريبية
        test_data = [
            (1, "أحمد", 28, "الرياض", 15000.0),
            (2, "سارة", 32, "جدة", 25000.5)
        ]
        schema = ["customer_id", "name", "age", "city", "total_spent"]
        expected_df = self.spark.createDataFrame(test_data, schema)
        
        # حفظ البيانات التجريبية
        expected_df.write.mode("overwrite").csv("test_data.csv")
        
        # قراءة البيانات
        actual_df = self.spark.read \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .csv("test_data.csv")
        
        # التحقق
        assertDataFrameEqual(actual_df, expected_df)
    
    def test_data_transformation(self):
        """اختبار تحويل البيانات"""
        # بيانات الإدخال
        input_data = [(1, 28, 15000.0), (2, 32, 25000.5)]
        input_df = self.spark.createDataFrame(input_data, ["id", "age", "spent"])
        
        # التحويل المتوقع
        expected_data = [(1, 28, 15000.0, "شاب"), (2, 32, 25000.5, "شباب")]
        expected_df = self.spark.createDataFrame(
            expected_data, 
            ["id", "age", "spent", "age_group"]
        )
        
        # تطبيق التحويل
        actual_df = input_df.withColumn(
            "age_group",
            when(col("age") < 30, "شاب")
            .when(col("age").between(30, 40), "شباب")
            .otherwise("كبير")
        )
        
        # التحقق
        assertDataFrameEqual(actual_df, expected_df)
    
    def test_data_aggregation(self):
        """اختبار تجميع البيانات"""
        # بيانات الإدخال
        input_data = [
            (1, "الرياض", 15000.0),
            (2, "الرياض", 25000.5),
            (3, "جدة", 18000.0)
        ]
        input_df = self.spark.createDataFrame(
            input_data, ["id", "city", "spent"]
        )
        
        # التجميع المتوقع
        expected_data = [
            ("الرياض", 2, 20000.25, 40000.5),
            ("جدة", 1, 18000.0, 18000.0)
        ]
        expected_df = self.spark.createDataFrame(
            expected_data,
            ["city", "customer_count", "avg_spent", "total_spent"]
        )
        
        # تطبيق التجميع
        actual_df = input_df.groupBy("city").agg(
            count("*").alias("customer_count"),
            avg("spent").alias("avg_spent"),
            sum("spent").alias("total_spent")
        )
        
        # التحقق
        assertDataFrameEqual(actual_df, expected_df)

# 10. إيقاف جلسة Spark
spark.stop()
                        
4

الخطوة 4: تعلم Hadoop

Hadoop هو إطار عمل لتخزين ومعالجة البيانات الضخمة باستخدام MapReduce و HDFS (Hadoop Distributed File System)

الأهمية:

الأساس لتخزين البيانات الضخمة عبر أنظمة متعددة

الأدوات:

Hortonworks Sandbox أو Hadoop CLI

مثال عملي تخزين بيانات باستخدام HDFS:

#!/bin/bash

# 1. إعداد وتكوين Hadoop
# تثبيت Hadoop (لـ Ubuntu/Debian)
sudo apt-get update
sudo apt-get install -y openjdk-11-jdk
wget https://downloads.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz
tar -xzf hadoop-3.3.6.tar.gz
sudo mv hadoop-3.3.6 /usr/local/hadoop

# إعداد متغيرات البيئة
echo 'export HADOOP_HOME=/usr/local/hadoop' >> ~/.bashrc
echo 'export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin' >> ~/.bashrc
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64' >> ~/.bashrc
source ~/.bashrc

# 2. تكوين Hadoop
# ملف core-site.xml
cat > $HADOOP_HOME/etc/hadoop/core-site.xml << 'EOF'



    
        fs.defaultFS
        hdfs://localhost:9000
    
    
        hadoop.tmp.dir
        /tmp/hadoop
    

EOF

# ملف hdfs-site.xml
cat > $HADOOP_HOME/etc/hadoop/hdfs-site.xml << 'EOF'



    
        dfs.replication
        1
    
    
        dfs.namenode.name.dir
        /tmp/hadoop/namenode
    
    
        dfs.datanode.data.dir
        /tmp/hadoop/datanode
    
    
        dfs.permissions
        false
    

EOF

# ملف mapred-site.xml
cat > $HADOOP_HOME/etc/hadoop/mapred-site.xml << 'EOF'



    
        mapreduce.framework.name
        yarn
    
    
        mapreduce.jobhistory.address
        localhost:10020
    
    
        mapreduce.jobhistory.webapp.address
        localhost:19888
    

EOF

# ملف yarn-site.xml
cat > $HADOOP_HOME/etc/hadoop/yarn-site.xml << 'EOF'



    
        yarn.nodemanager.aux-services
        mapreduce_shuffle
    
    
        yarn.nodemanager.aux-services.mapreduce.shuffle.class
        org.apache.hadoop.mapred.ShuffleHandler
    
    
        yarn.resourcemanager.hostname
        localhost
    

EOF

# 3. تشغيل Hadoop
echo "تهيئة HDFS..."
hdfs namenode -format

echo "بدء خدمات Hadoop..."
start-dfs.sh
start-yarn.sh

echo "التحقق من حالة Hadoop..."
jps

# 4. أوامر HDFS الأساسية
echo "=== أوامر HDFS الأساسية ==="

# إنشاء مجلدات
echo "إنشاء مجلدات في HDFS..."
hdfs dfs -mkdir -p /data/raw
hdfs dfs -mkdir -p /data/processed
hdfs dfs -mkdir -p /data/backup
hdfs dfs -mkdir -p /logs

# عرض هيكل المجلدات
echo "عرض هيكل HDFS..."
hdfs dfs -ls -R /

# 5. تحميل البيانات إلى HDFS
echo "=== تحميل البيانات إلى HDFS ==="

# إنشاء بيانات تجريبية
echo "إنشاء بيانات تجريبية..."
cat > customers.csv << 'EOF'
customer_id,name,email,age,city,total_spent
1,أحمد محمد,ahmed@example.com,28,الرياض,15000.00
2,سارة العتيبي,sara@example.com,32,جدة,25000.50
3,خالد السليم,khaled@example.com,45,الرياض,85000.75
4,نورة القحطاني,noura@example.com,35,الدمام,32000.25
5,محمد الحربي,mohammed@example.com,29,مكة,18000.00
EOF

cat > orders.csv << 'EOF'
order_id,customer_id,order_date,total_amount,status
1001,1,2024-01-10,3650.00,delivered
1002,2,2024-01-12,2900.00,shipped
1003,3,2024-01-15,1200.00,processing
1004,1,2024-01-18,500.00,pending
1005,4,2024-01-20,1800.00,delivered
EOF

# تحميل الملفات إلى HDFS
echo "تحميل customers.csv إلى HDFS..."
hdfs dfs -put customers.csv /data/raw/
echo "تحميل orders.csv إلى HDFS..."
hdfs dfs -put orders.csv /data/raw/

# التحقق من الملفات المحملة
echo "التحقق من الملفات المحملة..."
hdfs dfs -ls /data/raw/

# 6. أوامر HDFS المتقدمة
echo "=== أوامر HDFS المتقدمة ==="

# عرض محتوى الملفات
echo "عرض محتوى customers.csv..."
hdfs dfs -cat /data/raw/customers.csv

# عرض أول 3 سطور
echo "عرض أول 3 سطور من orders.csv..."
hdfs dfs -head -3 /data/raw/orders.csv

# عرض آخر 3 سطور
echo "عرض آخر 3 سطور من orders.csv..."
hdfs dfs -tail -3 /data/raw/orders.csv

# حجم الملفات
echo "حجم الملفات في HDFS..."
hdfs dfs -du -h /data/raw/

# المساحة الإجمالية
echo "المساحة الإجمالية في HDFS..."
hdfs dfs -df -h

# 7. نسخ ونقل الملفات
echo "=== نسخ ونقل الملفات ==="

# نسخ من النظام المحلي إلى HDFS
echo "نسخ ملف محلي إلى HDFS..."
hdfs dfs -copyFromLocal customers.csv /data/backup/customers_backup.csv

# نسخ من HDFS إلى النظام المحلي
echo "نسخ من HDFS إلى النظام المحلي..."
hdfs dfs -copyToLocal /data/raw/orders.csv ./orders_local.csv

# نقل ملفات داخل HDFS
echo "نقل ملف داخل HDFS..."
hdfs dfs -mv /data/backup/customers_backup.csv /data/processed/

# 8. إدارة الأذونات
echo "=== إدارة أذونات الملفات ==="

# تغيير مالك الملف
echo "تغيير مالك الملف..."
hdfs dfs -chown hadoop:hadoop /data/raw/customers.csv

# تغيير الصلاحيات
echo "تغيير صلاحيات الملف..."
hdfs dfs -chmod 755 /data/raw/customers.csv

# عرض الأذونات
echo "عرض أذونات الملفات..."
hdfs dfs -ls -h /data/raw/

# 9. إدارة الحصص (Quotas)
echo "=== إدارة حصص التخزين ==="

# تعيين حصة للمجلد
echo "تعيين حصة تخزين..."
hdfs dfsadmin -setSpaceQuota 1G /data/raw

# عرض الحصة
echo "عرض حصة التخزين..."
hdfs dfs -count -q /data/raw

# إزالة الحصة
echo "إزالة حصة التخزين..."
hdfs dfsadmin -clrSpaceQuota /data/raw

# 10. فحص وصيانة HDFS
echo "=== فحص وصيانة HDFS ==="

# فحص نظام الملفات
echo "فحص نظام الملفات HDFS..."
hdfs fsck / -files -blocks -locations

# عرض تقرير HDFS
echo "عرض تقرير HDFS..."
hdfs dfsadmin -report

# عرض حالة NameNode
echo "عرض حالة NameNode..."
hdfs dfsadmin -printTopology

# 11. MapReduce مثال عملي
echo "=== تشغيل برنامج MapReduce ==="

# إنشاء برنامج MapReduce بسيط (WordCount)
cat > WordCount.java << 'EOF'
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
    
    public static class TokenizerMapper 
        extends Mapper {
        
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    
    public static class IntSumReducer 
        extends Reducer {
        
        private IntWritable result = new IntWritable();
        
        public void reduce(Text key, Iterable values,
                           Context context
        ) throws IOException, InterruptedException {
            
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
EOF

# إنشاء بيانات تجريبية للنص
cat > input.txt << 'EOF'
مرحبا عالم البيانات الضخمة
هندسة البيانات مهمة جدا
البيانات الضخمة تحتاج الى معالجة
هندسة البيانات تهتم بالمعالجة
EOF

# تحميل البيانات إلى HDFS
hdfs dfs -put input.txt /data/input/

# تجميع وتشغيل برنامج MapReduce
echo "تجميع برنامج MapReduce..."
javac -cp "$(hadoop classpath)" WordCount.java
jar cf wordcount.jar WordCount*.class

echo "تشغيل برنامج MapReduce..."
hadoop jar wordcount.jar WordCount /data/input /data/output

# عرض النتائج
echo "عرض نتائج MapReduce..."
hdfs dfs -cat /data/output/part-r-00000

# 12. HDFS Snapshots
echo "=== إدارة لقطات HDFS ==="

# تمكين الـ Snapshots
echo "تمكين الـ Snapshots..."
hdfs dfsadmin -allowSnapshot /data

# إنشاء Snapshot
echo "إنشاء Snapshot..."
hdfs dfs -createSnapshot /data data_snapshot_$(date +%Y%m%d)

# عرض الـ Snapshots
echo "عرض الـ Snapshots..."
hdfs dfs -ls /data/.snapshot

# استعادة من Snapshot
echo "استعادة ملف من Snapshot..."
hdfs dfs -cp /data/.snapshot/data_snapshot_*/raw/customers.csv /data/restored/

# حذف Snapshot
echo "حذف Snapshot..."
hdfs dfs -deleteSnapshot /data data_snapshot_$(date +%Y%m%d)

# 13. نسخ احتياطي واستعادة
echo "=== نسخ احتياطي واستعادة ==="

# نسخ احتياطي لـ HDFS
echo "نسخ احتياطي لـ HDFS..."
hdfs dfs -mkdir -p /backup
hadoop distcp hdfs://localhost:9000/data hdfs://localhost:9000/backup/data_backup

# التحقق من النسخة الاحتياطية
echo "التحقق من النسخة الاحتياطية..."
hdfs dfs -ls -R /backup/data_backup

# 14. مراقبة أداء HDFS
echo "=== مراقبة أداء HDFS ==="

# مراقبة استخدام HDFS
echo "استخدام HDFS الحالي:"
hdfs dfs -du -h /

# مراقبة DataNodes
echo "حالة DataNodes:"
hdfs dfsadmin -report | grep -A 5 "Live datanodes"

# مراقبة المساحة
echo "المساحة المتاحة:"
hdfs dfs -df -h

# 15. إيقاف خدمات Hadoop
echo "=== إيقاف خدمات Hadoop ==="

echo "إيقاف YARN..."
stop-yarn.sh

echo "إيقاف HDFS..."
stop-dfs.sh

echo "التحقق من توقف الخدمات..."
jps

echo "اكتملت عمليات HDFS!"

# 16. نصوص Bash مفيدة لإدارة Hadoop
cat > hadoop_manager.sh << 'EOF'
#!/bin/bash

HADOOP_HOME="/usr/local/hadoop"
LOG_FILE="/tmp/hadoop_manager.log"

log_message() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}

start_hadoop() {
    log_message "بدء تشغيل Hadoop..."
    
    # بدء HDFS
    $HADOOP_HOME/sbin/start-dfs.sh
    if [ $? -eq 0 ]; then
        log_message "تم بدء HDFS بنجاح"
    else
        log_message "فشل في بدء HDFS"
        return 1
    fi
    
    # بدء YARN
    $HADOOP_HOME/sbin/start-yarn.sh
    if [ $? -eq 0 ]; then
        log_message "تم بدء YARN بنجاح"
    else
        log_message "فشل في بدء YARN"
        return 1
    fi
    
    # التحقق من الخدمات
    jps | grep -E "NameNode|DataNode|ResourceManager|NodeManager"
    
    log_message "اكتمل بدء تشغيل Hadoop"
}

stop_hadoop() {
    log_message "إيقاف Hadoop..."
    
    # إيقاف YARN
    $HADOOP_HOME/sbin/stop-yarn.sh
    log_message "تم إيقاف YARN"
    
    # إيقاف HDFS
    $HADOOP_HOME/sbin/stop-dfs.sh
    log_message "تم إيقاف HDFS"
    
    log_message "اكتمل إيقاف Hadoop"
}

check_hadoop_status() {
    log_message "التحقق من حالة Hadoop..."
    
    # التحقق من HDFS
    hdfs dfsadmin -report > /tmp/hdfs_status.txt 2>&1
    if [ $? -eq 0 ]; then
        log_message "HDFS يعمل بشكل طبيعي"
    else
        log_message "HDFS غير متاح"
    fi
    
    # التحقق من YARN
    yarn node -list > /tmp/yarn_status.txt 2>&1
    if [ $? -eq 0 ]; then
        log_message "YARN يعمل بشكل طبيعي"
    else
        log_message "YARN غير متاح"
    fi
    
    # عرض معلومات الخدمات
    echo "=== حالة الخدمات ==="
    jps
}

clean_hadoop_data() {
    log_message "تنظيف بيانات Hadoop..."
    
    read -p "هل أنت متأكد من تنظيف جميع بيانات Hadoop؟ (yes/no): " confirm
    if [ "$confirm" != "yes" ]; then
        log_message "تم إلغاء التنظيف"
        return
    fi
    
    # إيقاف Hadoop
    stop_hadoop
    
    # حذف بيانات HDFS
    rm -rf /tmp/hadoop*
    log_message "تم حذف بيانات HDFS"
    
    # إعادة تهيئة NameNode
    hdfs namenode -format
    log_message "تم إعادة تهيئة NameNode"
    
    log_message "اكتمل تنظيف بيانات Hadoop"
}

backup_hdfs_data() {
    local backup_dir="/backup/hdfs_$(date +%Y%m%d_%H%M%S)"
    
    log_message "بدء النسخ الاحتياطي لـ HDFS إلى $backup_dir"
    
    # إنشاء مجلد النسخ الاحتياطي
    mkdir -p "$backup_dir"
    
    # نسخ بيانات HDFS
    hadoop distcp hdfs://localhost:9000/data "$backup_dir"
    
    if [ $? -eq 0 ]; then
        log_message "اكتمل النسخ الاحتياطي بنجاح"
        echo "النسخة الاحتياطية موجودة في: $backup_dir"
    else
        log_message "فشل النسخ الاحتياطي"
    fi
}

case "$1" in
    start)
        start_hadoop
        ;;
    stop)
        stop_hadoop
        ;;
    status)
        check_hadoop_status
        ;;
    clean)
        clean_hadoop_data
        ;;
    backup)
        backup_hdfs_data
        ;;
    restart)
        stop_hadoop
        sleep 5
        start_hadoop
        ;;
    *)
        echo "الاستخدام: $0 {start|stop|status|clean|backup|restart}"
        exit 1
        ;;
esac
EOF

chmod +x hadoop_manager.sh
echo "تم إنشاء برنامج إدارة Hadoop: ./hadoop_manager.sh"
                        
5

الخطوة 5: تعلم Airflow

Airflow لأتمتة وإدارة سير العمل المتعلقة ببياناتك

الأهمية:

ضروري لأتمتة العمليات اليومية مثل استخراج البيانات وتحويلها (ETL Pipelines) وتحميلها

الأدوات:

Airflow Web Interface

مثال عملي Apache Airflow:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.apache.kafka.operators.kafka import KafkaProducerOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import json
import pandas as pd
import requests

# 1. تعريف DAG الأساسي
default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email': ['data_team@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

# 2. إنشاء DAG
dag = DAG(
    'data_engineering_pipeline',
    default_args=default_args,
    description='خط أنابيب هندسة البيانات الكامل',
    schedule_interval='0 2 * * *',  # تشغيل يومياً في الساعة 2 صباحاً
    catchup=False,
    tags=['etl', 'data_pipeline', 'analytics'],
)

# 3. مهام البداية والنهاية
start_pipeline = DummyOperator(
    task_id='start_pipeline',
    dag=dag,
)

end_pipeline = DummyOperator(
    task_id='end_pipeline',
    dag=dag,
)

# 4. مهمة التحقق من توفر البيانات
check_data_availability = HttpSensor(
    task_id='check_data_availability',
    http_conn_id='data_api',
    endpoint='/api/health',
    response_check=lambda response: response.status_code == 200,
    timeout=300,
    poke_interval=30,
    mode='reschedule',
    dag=dag,
)

# 5. مهمة استخراج البيانات
def extract_data(**context):
    """استخراج البيانات من مصادر مختلفة"""
    execution_date = context['execution_date']
    
    # استخراج من API
    api_url = "https://api.example.com/data"
    response = requests.get(api_url)
    api_data = response.json()
    
    # استخراج من قاعدة بيانات
    import psycopg2
    conn = psycopg2.connect(
        host="localhost",
        database="source_db",
        user="user",
        password="password"
    )
    
    query = """
        SELECT * FROM sales 
        WHERE sale_date = %s
    """
    db_data = pd.read_sql(query, conn, params=[execution_date])
    conn.close()
    
    # حفظ البيانات المستخرجة
    api_data.to_json(f'/tmp/api_data_{execution_date}.json')
    db_data.to_csv(f'/tmp/db_data_{execution_date}.csv', index=False)
    
    return {
        'api_records': len(api_data),
        'db_records': len(db_data)
    }

extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    provide_context=True,
    dag=dag,
)

# 6. مهمة تحويل البيانات
def transform_data(**context):
    """تحويل وتنظيف البيانات"""
    execution_date = context['execution_date']
    ti = context['ti']
    extraction_result = ti.xcom_pull(task_ids='extract_data')
    
    # قراءة البيانات المستخرجة
    api_data = pd.read_json(f'/tmp/api_data_{execution_date}.json')
    db_data = pd.read_csv(f'/tmp/db_data_{execution_date}.csv')
    
    # تحويلات API البيانات
    api_transformed = api_data.copy()
    api_transformed['processed_date'] = execution_date
    api_transformed['data_source'] = 'api'
    
    # تنظيف البيانات
    api_transformed = api_transformed.dropna(subset=['customer_id', 'amount'])
    api_transformed['amount'] = pd.to_numeric(api_transformed['amount'], errors='coerce')
    
    # تحويلات قاعدة البيانات
    db_transformed = db_data.copy()
    db_transformed['processed_date'] = execution_date
    db_transformed['data_source'] = 'database'
    
    # دمج البيانات
    merged_data = pd.concat([api_transformed, db_transformed], ignore_index=True)
    
    # تجميع البيانات
    aggregated = merged_data.groupby(['customer_id', 'data_source']).agg({
        'amount': ['sum', 'mean', 'count'],
        'processed_date': 'max'
    }).reset_index()
    
    # حفظ البيانات المحولة
    merged_data.to_csv(f'/tmp/merged_data_{execution_date}.csv', index=False)
    aggregated.to_csv(f'/tmp/aggregated_data_{execution_date}.csv', index=False)
    
    return {
        'total_records': len(merged_data),
        'aggregated_records': len(aggregated)
    }

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    provide_context=True,
    dag=dag,
)

# 7. مهمة تحميل البيانات إلى Data Warehouse
load_to_warehouse = PostgresOperator(
    task_id='load_to_warehouse',
    postgres_conn_id='warehouse_db',
    sql='''
        CREATE TABLE IF NOT EXISTS daily_sales (
            customer_id INTEGER,
            data_source VARCHAR(50),
            total_amount DECIMAL(15,2),
            avg_amount DECIMAL(10,2),
            transaction_count INTEGER,
            processed_date DATE,
            loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        
        COPY daily_sales(customer_id, data_source, total_amount, 
                        avg_amount, transaction_count, processed_date)
        FROM '/tmp/aggregated_data_{{ ds }}.csv'
        DELIMITER ','
        CSV HEADER;
    ''',
    dag=dag,
)

# 8. مهمة معالجة البيانات بـ Spark
process_with_spark = SparkSubmitOperator(
    task_id='process_with_spark',
    application='/opt/airflow/dags/spark_jobs/data_processing.py',
    conn_id='spark_default',
    application_args=[
        '--input-path', '/tmp/merged_data_{{ ds }}.csv',
        '--output-path', '/data/processed/{{ ds }}',
        '--execution-date', '{{ ds }}'
    ],
    jars='/opt/airflow/jars/spark-avro_2.12-3.3.0.jar',
    total_executor_cores=4,
    executor_cores=2,
    executor_memory='4g',
    driver_memory='2g',
    verbose=True,
    dag=dag,
)

# 9. مهمة إرسال البيانات إلى Kafka
send_to_kafka = KafkaProducerOperator(
    task_id='send_to_kafka',
    kafka_config_id='kafka_default',
    topic='processed_data',
    producer_function='airflow.providers.apache.kafka.producer.json_producer',
    producer_function_args=[
        'processed_data_topic',
        {'data': '/tmp/aggregated_data_{{ ds }}.csv', 'date': '{{ ds }}'}
    ],
    dag=dag,
)

# 10. مهمة مراقبة جودة البيانات
def data_quality_check(**context):
    """فحص جودة البيانات"""
    execution_date = context['execution_date']
    
    # قراءة البيانات المحولة
    data = pd.read_csv(f'/tmp/merged_data_{execution_date}.csv')
    
    checks = {}
    
    # فحص القيم المفقودة
    checks['missing_values'] = data.isnull().sum().sum()
    
    # فحص القيم المتطرفة
    numeric_cols = data.select_dtypes(include=['float64', 'int64']).columns
    for col in numeric_cols:
        q1 = data[col].quantile(0.25)
        q3 = data[col].quantile(0.75)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        outliers = data[(data[col] < lower_bound) | (data[col] > upper_bound)][col].count()
        checks[f'outliers_{col}'] = outliers
    
    # فحص التكرارات
    checks['duplicates'] = data.duplicated().sum()
    
    # فحص صحة البيانات
    checks['negative_amounts'] = data[data['amount'] < 0]['amount'].count() if 'amount' in data.columns else 0
    
    # حفظ نتائج الفحص
    with open(f'/tmp/data_quality_{execution_date}.json', 'w') as f:
        json.dump(checks, f, indent=2)
    
    # التحقق من معايير الجودة
    quality_issues = []
    if checks['missing_values'] > 100:
        quality_issues.append(f"عدد كبير من القيم المفقودة: {checks['missing_values']}")
    
    if checks['duplicates'] > 50:
        quality_issues.append(f"عدد كبير من التكرارات: {checks['duplicates']}")
    
    if quality_issues:
        raise ValueError(f"مشاكل جودة البيانات: {', '.join(quality_issues)}")
    
    return checks

quality_check = PythonOperator(
    task_id='data_quality_check',
    python_callable=data_quality_check,
    provide_context=True,
    dag=dag,
)

# 11. مهمة إنشاء التقارير
def generate_reports(**context):
    """إنشاء تقارير البيانات"""
    execution_date = context['execution_date']
    
    # قراءة البيانات
    data = pd.read_csv(f'/tmp/aggregated_data_{execution_date}.csv')
    
    # تقرير المبيعات اليومية
    daily_sales_report = data.groupby('data_source').agg({
        'total_amount': 'sum',
        'transaction_count': 'sum',
        'avg_amount': 'mean'
    }).reset_index()
    
    daily_sales_report.to_csv(f'/reports/daily_sales_{execution_date}.csv', index=False)
    
    # تقرير العملاء
    customer_summary = data.groupby('customer_id').agg({
        'total_amount': 'sum',
        'transaction_count': 'sum',
        'avg_amount': 'mean'
    }).reset_index()
    
    customer_summary.to_csv(f'/reports/customer_summary_{execution_date}.csv', index=False)
    
    # تقرير PDF
    from reportlab.lib.pagesizes import letter
    from reportlab.pdfgen import canvas
    
    pdf_path = f'/reports/summary_report_{execution_date}.pdf'
    c = canvas.Canvas(pdf_path, pagesize=letter)
    
    c.drawString(100, 750, f"تقرير المبيعات - {execution_date}")
    c.drawString(100, 730, f"إجمالي المبيعات: {daily_sales_report['total_amount'].sum():,.2f}")
    c.drawString(100, 710, f"عدد المعاملات: {daily_sales_report['transaction_count'].sum():,.0f}")
    
    c.save()
    
    return {
        'sales_report': f'/reports/daily_sales_{execution_date}.csv',
        'customer_report': f'/reports/customer_summary_{execution_date}.csv',
        'pdf_report': pdf_path
    }

generate_reports_task = PythonOperator(
    task_id='generate_reports',
    python_callable=generate_reports,
    provide_context=True,
    dag=dag,
)

# 12. مهمة التنظيف
cleanup_task = BashOperator(
    task_id='cleanup_temporary_files',
    bash_command='''
        rm -f /tmp/api_data_{{ ds }}.json
        rm -f /tmp/db_data_{{ ds }}.csv
        rm -f /tmp/merged_data_{{ ds }}.csv
        rm -f /tmp/aggregated_data_{{ ds }}.csv
        rm -f /tmp/data_quality_{{ ds }}.json
    ''',
    dag=dag,
)

# 13. مهمة إرسال الإشعارات
def send_notifications(**context):
    """إرسال إشعارات بنجاح العملية"""
    execution_date = context['execution_date']
    ti = context['ti']
    
    # جمع نتائج المهام
    extraction_result = ti.xcom_pull(task_ids='extract_data')
    transformation_result = ti.xcom_pull(task_ids='transform_data')
    quality_result = ti.xcom_pull(task_ids='data_quality_check')
    report_result = ti.xcom_pull(task_ids='generate_reports')
    
    # إنشاء رسالة الإشعار
    message = f"""
    ✅ اكتمل خط أنابيب البيانات بنجاح
    
    📅 التاريخ: {execution_date}
    
    📊 النتائج:
    - البيانات المستخرجة: {extraction_result['api_records']} من API، {extraction_result['db_records']} من قاعدة البيانات
    - البيانات المحولة: {transformation_result['total_records']} سجل
    - البيانات المجمعة: {transformation_result['aggregated_records']} سجل
    - جودة البيانات: ✅ جميع الاختبارات ناجحة
    
    📈 التقارير المولدة:
    - تقرير المبيعات: {report_result['sales_report']}
    - تقرير العملاء: {report_result['customer_report']}
    - تقرير PDF: {report_result['pdf_report']}
    
    ⏱️ وقت التشغيل: {{ task_instance.duration }} ثانية
    
    تم بنجاح! 🎉
    """
    
    # إرسال الإشعار (يمكن استخدام Slack، Email، etc.)
    print(message)
    
    # هنا يمكن إضافة كود لإرسال الإشعار عبر Slack، Email، etc.
    
    return message

notifications_task = PythonOperator(
    task_id='send_notifications',
    python_callable=send_notifications,
    provide_context=True,
    dag=dag,
)

# 14. تعريف تبعيات المهام
start_pipeline >> check_data_availability >> extract_task

extract_task >> transform_task >> quality_check

quality_check >> [load_to_warehouse, process_with_spark, send_to_kafka]

[load_to_warehouse, process_with_spark, send_to_kafka] >> generate_reports_task

generate_reports_task >> cleanup_task >> notifications_task >> end_pipeline

# 15. DAG إضافي للصيانة الأسبوعية
maintenance_dag = DAG(
    'weekly_maintenance',
    default_args=default_args,
    description='صيانة أسبوعية لأنظمة البيانات',
    schedule_interval='0 3 * * 0',  # كل أحد في الساعة 3 صباحاً
    catchup=False,
    tags=['maintenance', 'cleanup'],
)

# مهام الصيانة الأسبوعية
optimize_tables = PostgresOperator(
    task_id='optimize_database_tables',
    postgres_conn_id='warehouse_db',
    sql='''
        ANALYZE VERBOSE daily_sales;
        VACUUM ANALYZE daily_sales;
        REINDEX TABLE daily_sales;
    ''',
    dag=maintenance_dag,
)

cleanup_old_data = BashOperator(
    task_id='cleanup_old_data',
    bash_command='''
        # حذف البيانات القديمة (أكثر من 90 يوم)
        psql warehouse_db -c "
            DELETE FROM daily_sales 
            WHERE processed_date < CURRENT_DATE - INTERVAL '90 days';
        "
        
        # حذف التقارير القديمة
        find /reports -name "*.csv" -mtime +30 -delete
        find /reports -name "*.pdf" -mtime +30 -delete
    ''',
    dag=maintenance_dag,
)

backup_database = BashOperator(
    task_id='backup_database',
    bash_command='''
        # نسخ احتياطي لقاعدة البيانات
        pg_dump warehouse_db > /backups/warehouse_$(date +%Y%m%d).sql
        gzip /backups/warehouse_$(date +%Y%m%d).sql
        
        # حذف النسخ الاحتياطية القديمة
        find /backups -name "*.sql.gz" -mtime +30 -delete
    ''',
    dag=maintenance_dag,
)

# تبعيات مهام الصيانة
optimize_tables >> cleanup_old_data >> backup_database

# 16. اختبارات Airflow
def test_dag_integrity():
    """اختبار سلامة DAG"""
    from airflow.models import DagBag
    
    dag_bag = DagBag()
    
    assert len(dag_bag.import_errors) == 0, f"أخطاء في تحميل DAGs: {dag_bag.import_errors}"
    assert 'data_engineering_pipeline' in dag_bag.dags, "DAG غير موجود"
    
    dag = dag_bag.dags['data_engineering_pipeline']
    
    # التحقق من المهام
    tasks = dag.tasks
    assert len(tasks) == 12, f"عدد المهام المتوقع: 12، الفعلي: {len(tasks)}"
    
    # التحقق من التبعيات
    start_task = dag.get_task('start_pipeline')
    assert len(start_task.downstream_list) == 1, "مهمة البداية يجب أن يكون لها تابع واحد"
    
    print("✅ جميع اختبارات DAG ناجحة")

if __name__ == "__main__":
    test_dag_integrity()
                        
6

الخطوة 6: تعلم Kafka

Kafka للمعالجة البيانات في الوقت الفعلي (Real-Time Data Streaming)

الأهمية:

ضروري للمعالجة البيانات في الوقت الفعلي، مثل تحليلات الحدث

الأدوات:

Confluent Platform أو Kafka CLI

مثال عملي Apache Kafka:

from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import KafkaError
import json
import time
from datetime import datetime
from typing import Dict, List, Optional
import threading
import logging

# 1. إعداد التسجيل (Logging)
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 2. فئة لإدارة Kafka Producer
class DataStreamProducer:
    """منتج تدفق البيانات إلى Kafka"""
    
    def __init__(self, bootstrap_servers: List[str]):
        self.bootstrap_servers = bootstrap_servers
        self.producer = None
        self._connect()
    
    def _connect(self):
        """الاتصال بـ Kafka"""
        try:
            self.producer = KafkaProducer(
                bootstrap_servers=self.bootstrap_servers,
                value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                key_serializer=lambda k: str(k).encode('utf-8') if k else None,
                acks='all',
                retries=3,
                compression_type='gzip',
                max_in_flight_requests_per_connection=1
            )
            logger.info("تم الاتصال بـ Kafka Producer بنجاح")
        except Exception as e:
            logger.error(f"فشل الاتصال بـ Kafka: {e}")
            raise
    
    def produce_message(self, topic: str, key: Optional[str], value: Dict):
        """إنتاج رسالة إلى Kafka"""
        try:
            future = self.producer.send(
                topic=topic,
                key=key,
                value=value
            )
            
            # انتظار التأكيد
            result = future.get(timeout=10)
            logger.info(f"تم إرسال الرسالة إلى {topic} - Partition: {result.partition}, Offset: {result.offset}")
            return True
        except KafkaError as e:
            logger.error(f"فشل إرسال الرسالة: {e}")
            return False
    
    def produce_batch(self, topic: str, messages: List[Dict]):
        """إنتاج دفعة من الرسائل"""
        success_count = 0
        fail_count = 0
        
        for i, message in enumerate(messages):
            key = message.get('id', str(i))
            if self.produce_message(topic, key, message):
                success_count += 1
            else:
                fail_count += 1
            
            # إضافة تأخير صغير بين الرسائل
            time.sleep(0.01)
        
        logger.info(f"الدفعة: {success_count} ناجحة، {fail_count} فاشلة")
        return success_count, fail_count
    
    def close(self):
        """إغلاق المنتج"""
        if self.producer:
            self.producer.flush()
            self.producer.close()
            logger.info("تم إغلاق Kafka Producer")

# 3. فئة لإدارة Kafka Consumer
class DataStreamConsumer:
    """مستهلك تدفق البيانات من Kafka"""
    
    def __init__(self, bootstrap_servers: List[str], group_id: str):
        self.bootstrap_servers = bootstrap_servers
        self.group_id = group_id
        self.consumer = None
        self.running = False
        self._connect()
    
    def _connect(self):
        """الاتصال بـ Kafka"""
        try:
            self.consumer = KafkaConsumer(
                bootstrap_servers=self.bootstrap_servers,
                group_id=self.group_id,
                value_deserializer=lambda v: json.loads(v.decode('utf-8')),
                key_deserializer=lambda k: k.decode('utf-8') if k else None,
                auto_offset_reset='earliest',
                enable_auto_commit=True,
                auto_commit_interval_ms=1000,
                max_poll_records=100,
                session_timeout_ms=30000,
                heartbeat_interval_ms=10000
            )
            logger.info("تم الاتصال بـ Kafka Consumer بنجاح")
        except Exception as e:
            logger.error(f"فشل الاتصال بـ Kafka: {e}")
            raise
    
    def subscribe(self, topics: List[str]):
        """الاشتراك في مواضيع"""
        self.consumer.subscribe(topics)
        logger.info(f"تم الاشتراك في المواضيع: {topics}")
    
    def consume_messages(self, callback):
        """استهلاك الرسائل"""
        self.running = True
        
        try:
            while self.running:
                # جلب الرسائل
                batch = self.consumer.poll(timeout_ms=1000)
                
                for topic_partition, messages in batch.items():
                    logger.info(f"جلب {len(messages)} رسالة من {topic_partition}")
                    
                    for message in messages:
                        try:
                            # معالجة الرسالة
                            result = callback({
                                'topic': topic_partition.topic,
                                'partition': topic_partition.partition,
                                'offset': message.offset,
                                'key': message.key,
                                'value': message.value,
                                'timestamp': message.timestamp
                            })
                            
                            if result is False:
                                logger.warning("توقف المعالجة بناءً على رد الدالة")
                                return
                        
                        except Exception as e:
                            logger.error(f"خطأ في معالجة الرسالة: {e}")
                
                # التزام بالإزاحة
                self.consumer.commit()
        
        except KeyboardInterrupt:
            logger.info("توقف المستهلك بواسطة المستخدم")
        except Exception as e:
            logger.error(f"خطأ في استهلاك الرسائل: {e}")
        finally:
            self.close()
    
    def consume_single_message(self, timeout_ms: int = 10000):
        """استهلاك رسالة واحدة"""
        try:
            messages = self.consumer.poll(timeout_ms=timeout_ms)
            
            for topic_partition, msgs in messages.items():
                if msgs:
                    message = msgs[0]
                    return {
                        'topic': topic_partition.topic,
                        'partition': topic_partition.partition,
                        'offset': message.offset,
                        'key': message.key,
                        'value': message.value,
                        'timestamp': message.timestamp
                    }
        
        except Exception as e:
            logger.error(f"خطأ في استهلاك الرسالة: {e}")
        
        return None
    
    def close(self):
        """إغلاق المستهلك"""
        if self.consumer:
            self.consumer.close()
            logger.info("تم إغلاق Kafka Consumer")

# 4. فئة لإدارة Kafka Topics
class KafkaTopicManager:
    """مدير مواضيع Kafka"""
    
    def __init__(self, bootstrap_servers: List[str]):
        self.bootstrap_servers = bootstrap_servers
        self.admin_client = None
        self._connect()
    
    def _connect(self):
        """الاتصال بـ Kafka Admin"""
        try:
            self.admin_client = KafkaAdminClient(
                bootstrap_servers=self.bootstrap_servers,
                client_id='kafka_topic_manager'
            )
            logger.info("تم الاتصال بـ Kafka Admin Client بنجاح")
        except Exception as e:
            logger.error(f"فشل الاتصال بـ Kafka Admin: {e}")
            raise
    
    def create_topic(self, topic_name: str, num_partitions: int = 3, replication_factor: int = 1):
        """إنشاء موضوع جديد"""
        try:
            topic = NewTopic(
                name=topic_name,
                num_partitions=num_partitions,
                replication_factor=replication_factor,
                topic_configs={
                    'retention.ms': '604800000',  # 7 أيام
                    'cleanup.policy': 'delete',
                    'compression.type': 'gzip'
                }
            )
            
            self.admin_client.create_topics([topic])
            logger.info(f"تم إنشاء الموضوع: {topic_name}")
            return True
        
        except Exception as e:
            logger.error(f"فشل إنشاء الموضوع {topic_name}: {e}")
            return False
    
    def delete_topic(self, topic_name: str):
        """حذف موضوع"""
        try:
            self.admin_client.delete_topics([topic_name])
            logger.info(f"تم حذف الموضوع: {topic_name}")
            return True
        
        except Exception as e:
            logger.error(f"فشل حذف الموضوع {topic_name}: {e}")
            return False
    
    def list_topics(self):
        """عرض جميع المواضيع"""
        try:
            topics = self.admin_client.list_topics()
            logger.info(f"المواضيع المتاحة: {topics}")
            return topics
        
        except Exception as e:
            logger.error(f"فشل عرض المواضيع: {e}")
            return []
    
    def describe_topic(self, topic_name: str):
        """وصف موضوع"""
        try:
            configs = self.admin_client.describe_configs(
                config_resources=[ConfigResource(ConfigResourceType.TOPIC, topic_name)]
            )
            
            for config_resource in configs:
                logger.info(f"إعدادات الموضوع {topic_name}:")
                for key, value in config_resource.items():
                    logger.info(f"  {key}: {value.value}")
            
            return configs
        
        except Exception as e:
            logger.error(f"فشل وصف الموضوع {topic_name}: {e}")
            return None
    
    def close(self):
        """إغلاق اتصال Admin"""
        if self.admin_client:
            self.admin_client.close()
            logger.info("تم إغلاق Kafka Admin Client")

# 5. فئة لمعالجة تدفق البيانات في الوقت الفعلي
class RealTimeDataProcessor:
    """معالج بيانات في الوقت الفعلي باستخدام Kafka"""
    
    def __init__(self, bootstrap_servers: List[str], source_topic: str, sink_topic: str):
        self.bootstrap_servers = bootstrap_servers
        self.source_topic = source_topic
        self.sink_topic = sink_topic
        
        self.producer = DataStreamProducer(bootstrap_servers)
        self.consumer = DataStreamConsumer(bootstrap_servers, 'realtime_processor_group')
        self.topic_manager = KafkaTopicManager(bootstrap_servers)
    
    def setup_topics(self):
        """إعداد المواضيع"""
        # إنشاء مواضيع إذا لم تكن موجودة
        topics = self.topic_manager.list_topics()
        
        if self.source_topic not in topics:
            self.topic_manager.create_topic(self.source_topic)
        
        if self.sink_topic not in topics:
            self.topic_manager.create_topic(self.sink_topic)
    
    def process_message(self, message: Dict) -> Optional[Dict]:
        """معالجة رسالة فردية"""
        try:
            raw_data = message['value']
            
            # تحويل البيانات
            processed_data = {
                'id': raw_data.get('id', message['key']),
                'timestamp': datetime.now().isoformat(),
                'source_topic': message['topic'],
                'source_partition': message['partition'],
                'source_offset': message['offset'],
                'customer_id': raw_data.get('customer_id'),
                'event_type': raw_data.get('event_type'),
                'amount': raw_data.get('amount', 0),
                'location': raw_data.get('location', {}),
                'metadata': {
                    'processing_time': datetime.now().isoformat(),
                    'processor_version': '1.0.0'
                }
            }
            
            # تحويلات إضافية
            if 'amount' in raw_data:
                processed_data['amount_usd'] = raw_data['amount'] * 3.75  # تحويل إلى دولار
                processed_data['amount_category'] = self._categorize_amount(raw_data['amount'])
            
            if 'timestamp' in raw_data:
                processed_data['hour_of_day'] = datetime.fromisoformat(raw_data['timestamp']).hour
                processed_data['day_of_week'] = datetime.fromisoformat(raw_data['timestamp']).strftime('%A')
            
            # فحص الجودة
            if self._validate_data(processed_data):
                return processed_data
            else:
                logger.warning(f"بيانات غير صالحة: {processed_data['id']}")
                return None
        
        except Exception as e:
            logger.error(f"خطأ في معالجة الرسالة: {e}")
            return None
    
    def _categorize_amount(self, amount: float) -> str:
        """تصنيف المبلغ"""
        if amount < 100:
            return 'صغير'
        elif amount < 1000:
            return 'متوسط'
        else:
            return 'كبير'
    
    def _validate_data(self, data: Dict) -> bool:
        """التحقق من صحة البيانات"""
        required_fields = ['id', 'customer_id', 'event_type']
        
        for field in required_fields:
            if not data.get(field):
                return False
        
        if data.get('amount') and data['amount'] < 0:
            return False
        
        return True
    
    def start_processing(self):
        """بدء المعالجة"""
        logger.info("بدء معالجة البيانات في الوقت الفعلي")
        
        # إعداد المواضيع
        self.setup_topics()
        
        # الاشتراك في الموضوع المصدر
        self.consumer.subscribe([self.source_topic])
        
        # دالة رد الاتصال للمعالجة
        def process_callback(message):
            # معالجة الرسالة
            processed_data = self.process_message(message)
            
            if processed_data:
                # إرسال البيانات المعالجة إلى الموضوع الهدف
                success = self.producer.produce_message(
                    topic=self.sink_topic,
                    key=processed_data['id'],
                    value=processed_data
                )
                
                if success:
                    logger.info(f"تم معالجة وإرسال: {processed_data['id']}")
                else:
                    logger.error(f"فشل إرسال: {processed_data['id']}")
            
            return True  # مواصلة المعالجة
        
        # بدء استهلاك البيانات
        self.consumer.consume_messages(process_callback)
    
    def stop_processing(self):
        """إيقاف المعالجة"""
        logger.info("إيقاف معالجة البيانات")
        self.consumer.running = False
    
    def close(self):
        """إغلاق جميع الاتصالات"""
        self.producer.close()
        self.consumer.close()
        self.topic_manager.close()
        logger.info("تم إغلاق جميع اتصالات Kafka")

# 6. مثال استخدام
if __name__ == "__main__":
    # تكوين Kafka
    BOOTSTRAP_SERVERS = ['localhost:9092']
    SOURCE_TOPIC = 'raw_events'
    SINK_TOPIC = 'processed_events'
    
    # إنشاء معالج البيانات
    processor = RealTimeDataProcessor(BOOTSTRAP_SERVERS, SOURCE_TOPIC, SINK_TOPIC)
    
    try:
        # بدء المعالجة في thread منفصل
        processing_thread = threading.Thread(target=processor.start_processing)
        processing_thread.start()
        
        # إنتاج بعض البيانات التجريبية
        producer = DataStreamProducer(BOOTSTRAP_SERVERS)
        
        # بيانات تجريبية
        test_messages = [
            {
                'id': 'event_001',
                'customer_id': 'cust_123',
                'event_type': 'purchase',
                'amount': 150.50,
                'timestamp': datetime.now().isoformat(),
                'location': {'city': 'الرياض', 'country': 'السعودية'}
            },
            {
                'id': 'event_002',
                'customer_id': 'cust_456',
                'event_type': 'view',
                'amount': 0,
                'timestamp': datetime.now().isoformat(),
                'location': {'city': 'جدة', 'country': 'السعودية'}
            },
            {
                'id': 'event_003',
                'customer_id': 'cust_789',
                'event_type': 'purchase',
                'amount': 2500.00,
                'timestamp': datetime.now().isoformat(),
                'location': {'city': 'الدمام', 'country': 'السعودية'}
            }
        ]
        
        # إرسال البيانات
        for message in test_messages:
            producer.produce_message(SOURCE_TOPIC, message['id'], message)
            time.sleep(1)
        
        # انتظار قليل لمعالجة البيانات
        time.sleep(5)
        
        # استهلاك البيانات المعالجة
        consumer = DataStreamConsumer(BOOTSTRAP_SERVERS, 'test_consumer_group')
        consumer.subscribe([SINK_TOPIC])
        
        print("📥 البيانات المعالجة:")
        for _ in range(3):
            processed_message = consumer.consume_single_message(5000)
            if processed_message:
                print(json.dumps(processed_message['value'], indent=2, ensure_ascii=False))
        
        consumer.close()
        producer.close()
        
    except KeyboardInterrupt:
        print("\nإيقاف المعالجة...")
    finally:
        processor.stop_processing()
        processor.close()

# 7. التكامل مع Spark Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def create_spark_kafka_stream():
    """إنشاء تدفق Spark من Kafka"""
    spark = SparkSession.builder \
        .appName("KafkaSparkIntegration") \
        .config("spark.sql.shuffle.partitions", "100") \
        .getOrCreate()
    
    # قراءة من Kafka
    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "processed_events") \
        .option("startingOffsets", "latest") \
        .load()
    
    # تعريف مخطط البيانات
    schema = StructType([
        StructField("id", StringType()),
        StructField("customer_id", StringType()),
        StructField("event_type", StringType()),
        StructField("amount", DoubleType()),
        StructField("amount_usd", DoubleType()),
        StructField("amount_category", StringType()),
        StructField("timestamp", TimestampType()),
        StructField("hour_of_day", IntegerType()),
        StructField("day_of_week", StringType()),
        StructField("location", StructType([
            StructField("city", StringType()),
            StructField("country", StringType())
        ])),
        StructField("metadata", StructType([
            StructField("processing_time", TimestampType()),
            StructField("processor_version", StringType())
        ]))
    ])
    
    # تحويل البيانات
    parsed_df = kafka_df \
        .select(from_json(col("value").cast("string"), schema).alias("data")) \
        .select("data.*")
    
    # معالجة التدفق
    processed_stream = parsed_df \
        .withWatermark("timestamp", "5 minutes") \
        .groupBy(
            window("timestamp", "1 hour"),
            "event_type",
            "amount_category"
        ).agg(
            count("*").alias("event_count"),
            sum("amount").alias("total_amount"),
            avg("amount").alias("avg_amount"),
            approx_count_distinct("customer_id").alias("unique_customers")
        )
    
    return processed_stream

# 8. مراقبة Kafka
class KafkaMonitor:
    """مراقب حالة Kafka"""
    
    def __init__(self, bootstrap_servers: List[str]):
        self.bootstrap_servers = bootstrap_servers
        self.admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    
    def get_cluster_health(self):
        """الحصول على حالة الكتلة"""
        try:
            # الحصول على معلومات الكتلة
            cluster_description = self.admin_client.describe_cluster()
            
            health_info = {
                'cluster_id': cluster_description.cluster_id,
                'controller_id': cluster_description.controller.id,
                'broker_count': len(cluster_description.brokers),
                'brokers': [
                    {
                        'id': broker.id,
                        'host': broker.host,
                        'port': broker.port,
                        'rack': broker.rack
                    }
                    for broker in cluster_description.brokers
                ]
            }
            
            return health_info
        
        except Exception as e:
            logger.error(f"خطأ في الحصول على حالة الكتلة: {e}")
            return None
    
    def get_topic_metrics(self, topic_name: str):
        """الحصول على مقاييس الموضوع"""
        try:
            # الحصول على أقسام الموضوع
            partitions = self.admin_client.list_topics().get(topic_name, [])
            
            metrics = {
                'topic': topic_name,
                'partition_count': len(partitions),
                'partitions': []
            }
            
            # هنا يمكن إضافة كود للحصول على مقاييس الأقسام
            # مثل حجم البيانات، الإزاحات، etc.
            
            return metrics
        
        except Exception as e:
            logger.error(f"خطأ في الحصول على مقاييس الموضوع: {e}")
            return None
    
    def close(self):
        """إغلاق الاتصال"""
        self.admin_client.close()

# 9. اختبارات Kafka
import unittest

class TestKafkaIntegration(unittest.TestCase):
    """اختبار تكامل Kafka"""
    
    def setUp(self):
        self.bootstrap_servers = ['localhost:9092']
        self.test_topic = 'test_topic'
    
    def test_producer_connection(self):
        """اختبار اتصال المنتج"""
        producer = DataStreamProducer(self.bootstrap_servers)
        self.assertIsNotNone(producer.producer)
        producer.close()
    
    def test_consumer_connection(self):
        """اختبار اتصال المستهلك"""
        consumer = DataStreamConsumer(self.bootstrap_servers, 'test_group')
        self.assertIsNotNone(consumer.consumer)
        consumer.close()
    
    def test_topic_creation(self):
        """اختبار إنشاء الموضوع"""
        topic_manager = KafkaTopicManager(self.bootstrap_servers)
        success = topic_manager.create_topic(self.test_topic)
        self.assertTrue(success)
        
        # التحقق من وجود الموضوع
        topics = topic_manager.list_topics()
        self.assertIn(self.test_topic, topics)
        
        topic_manager.close()

if __name__ == '__main__':
    unittest.main()
                        

هندسة أنظمة البيانات

استخراج البيانات

APIs، قواعد بيانات، ملفات

معالجة البيانات

ETL/ELT، التنظيف، التحويل

تخزين البيانات

Data Warehouses، Data Lakes

التحليل

BI Tools، التحليلات، التقارير

أدوات هندسة البيانات

قواعد البيانات

PostgreSQL، MySQL، MongoDB، Cassandra

SQL NoSQL NewSQL

معالجة البيانات

Apache Spark، Apache Flink، Apache Beam

Batch Streaming Real-time

إدارة سير العمل

Apache Airflow، Luigi، Prefect

Orchestration Scheduling Monitoring

المزايا والتحديات

المزايا

  • طلب عالي: هناك طلب كبير على مهندسي البيانات، خاصة في الشركات التي تعتمد على البيانات الضخمة
  • أدوات مجانية: معظم الأدوات المستخدمة مثل Apache Spark و Hadoop مجانية ومفتوحة المصدر
  • مجتمع كبير: Python و Apache Spark لديهما مجتمعات نشطة توفر الدعم والموارد
  • إبداع لا محدود: يمكنك بناء أنظمة معقدة لمعالجة البيانات وتحليلها
  • رواتب ممتازة: مهندسو البيانات من أعلى المهن دخلاً في مجال التكنولوجيا

التحديات

  • منحنى التعلم الحاد: يتطلب فهماً جيداً لـ Python والرياضيات والإحصاء
  • حجم البيانات: قد تواجه تحديات في التعامل مع مجموعات بيانات كبيرة جداً
  • تحديثات متكررة: الأدوات والتقنيات تتطور باستمرار، مما يتطلب تحديث المعرفة بشكل منتظم
  • تعقيد الأنظمة: أنظمة هندسة البيانات معقدة وتتطلب فهماً عميقاً للهندسة

تخصصات في هندسة البيانات

هندسة ETL

تصميم وتنفيذ خطوط أنابيب استخراج وتحويل وتحميل

Airflow Spark SQL

هندسة التدفقات

معالجة البيانات في الوقت الفعلي والتدفقات

Kafka Flink Spark

هندسة المستودعات

تصميم وتنفيذ مستودعات البيانات

BigQuery Redshift Snowflake

الخلاصة

مهندس البيانات مجال متقدم ومطلوب بشدة في سوق العمل. من خلال إتقان Python، SQL، Apache Spark، Hadoop، Airflow، و Kafka، يمكنك بناء أنظمة متكاملة لجمع ومعالجة وتحليل البيانات بكفاءة عالية.

نصائح للبدء:

  • ابدأ بإتقان Python و SQL كأساس
  • تعلم Apache Spark لمعالجة البيانات الضخمة
  • اتقن إدارة قواعد البيانات SQL و NoSQL
  • تعلم Airflow لأتمتة سير العمل
  • جرب Kafka لمعالجة البيانات في الوقت الفعلي