مهندس البيانات
ما هو مهندس البيانات؟
مهندس البيانات
مهندس البيانات هو متخصص يصمم ويبني أنظمة جمع البيانات وتخزينها ومعالجتها، مما يمكن الشركات من تحليل كميات كبيرة من البيانات بكفاءة.
تخزين البيانات
تصميم أنظمة تخزين البيانات بكفاءة
معالجة البيانات
تحويل وتنظيف البيانات للتحليل
ETL Pipelines
بناء خطوط أنابيب استخراج وتحويل وتحميل
معالجة الوقت الفعلي
معالجة البيانات المباشرة والتدفقات
اللغات والأدوات المستخدمة
Python
اللغة الأساسية للتعامل مع البيانات وتحليل النماذج
SQL
لإدارة قواعد البيانات المحلية واستعلام البيانات
Java/Scala
(اختياري) عند العمل مع Hadoop و Apache Spark
Shell Scripting
لأتمتة المهام في أنظمة Unix/Linux
Docker
لتوفير بيئة موحدة وأتمتة النشر
Git/GitHub
لإدارة النسخ البرمجية والتعاون
مهارات مهندس البيانات
Python Programming
إتقان برمجة Python للتعامل مع البيانات
SQL & Databases
إدارة قواعد البيانات واستعلامات SQL
Apache Spark
معالجة البيانات الضخمة بسرعة
ETL Pipelines
بناء خطوط أنابيب استخراج وتحويل وتحميل
Data Warehousing
تصميم وتنفيذ مستودعات البيانات
Stream Processing
معالجة البيانات في الوقت الفعلي
خارطة التعلم خطوة بخطوة
الخطوة 1: تعلم Python
Python هي اللغة الأساسية للتعامل مع البيانات وتحليل النماذج باستخدام مكتبات مثل TensorFlow Privacy و Fairlearn
الأهمية:
أساس هندسة البيانات
مثال برنامج Python أساسي:
# 1. أساسيات Python لهندسة البيانات
print("مرحبا هندسة البيانات!")
def greet(name):
return f"مرحبا، {name}!"
print(greet('Data Engineer'))
# 2. التعامل مع البيانات في Python
import pandas as pd
import numpy as np
from datetime import datetime
# تحميل البيانات
data = {
'user_id': [1, 2, 3, 4, 5],
'name': ['أحمد', 'سارة', 'خالد', 'نورة', 'محمد'],
'age': [25, 32, 28, 45, 38],
'city': ['الرياض', 'جدة', 'الدمام', 'الرياض', 'مكة'],
'salary': [50000, 65000, 55000, 80000, 60000],
'join_date': ['2023-01-15', '2022-08-20', '2023-03-10', '2021-11-05', '2022-05-30']
}
df = pd.DataFrame(data)
df['join_date'] = pd.to_datetime(df['join_date'])
# 3. تحليل البيانات
print("=== معلومات أساسية عن البيانات ===")
print(f"عدد الصفوف: {df.shape[0]}")
print(f"عدد الأعمدة: {df.shape[1]}")
print(f"الأعمدة: {list(df.columns)}")
print(f"القيم الفارغة: {df.isnull().sum().sum()}")
print("\n=== الإحصائيات الوصفية ===")
print(df.describe())
print("\n=== معلومات عن الأنواع ===")
print(df.info())
# 4. تحويل وتنظيف البيانات
def clean_data(df):
"""تنظيف وتحويل البيانات"""
# نسخ البيانات
cleaned_df = df.copy()
# تعبئة القيم المفقودة
cleaned_df['age'] = cleaned_df['age'].fillna(cleaned_df['age'].median())
cleaned_df['salary'] = cleaned_df['salary'].fillna(cleaned_df['salary'].mean())
# تحويل الأنواع
cleaned_df['age'] = cleaned_df['age'].astype(int)
# إضافة أعمدة محسوبة
cleaned_df['years_of_service'] = (datetime.now() - cleaned_df['join_date']).dt.days // 365
cleaned_df['age_group'] = pd.cut(cleaned_df['age'],
bins=[0, 30, 40, 50, 100],
labels=['شاب', 'شباب', 'وسط', 'كبير'])
# إزالة القيم المتطرفة
Q1 = cleaned_df['salary'].quantile(0.25)
Q3 = cleaned_df['salary'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
cleaned_df = cleaned_df[(cleaned_df['salary'] >= lower_bound) &
(cleaned_df['salary'] <= upper_bound)]
return cleaned_df
# 5. تجميع البيانات
cleaned_data = clean_data(df)
print("\n=== البيانات بعد التنظيف ===")
print(cleaned_data)
# تجميع حسب المدينة
city_stats = cleaned_data.groupby('city').agg({
'user_id': 'count',
'age': ['mean', 'min', 'max'],
'salary': ['mean', 'sum', 'std']
}).round(2)
print("\n=== إحصائيات حسب المدينة ===")
print(city_stats)
# 6. التعامل مع الملفات
import json
import csv
import os
# حفظ البيانات بتنسيقات مختلفة
cleaned_data.to_csv('cleaned_users.csv', index=False, encoding='utf-8')
cleaned_data.to_json('cleaned_users.json', orient='records', force_ascii=False)
cleaned_data.to_parquet('cleaned_users.parquet', index=False)
print("\n=== حفظ البيانات ===")
print(f"CSV حجم الملف: {os.path.getsize('cleaned_users.csv')} بايت")
print(f"JSON حجم الملف: {os.path.getsize('cleaned_users.json')} بايت")
print(f"Parquet حجم الملف: {os.path.getsize('cleaned_users.parquet')} بايت")
# 7. APIs والتعامل مع الشبكة
import requests
from typing import Dict, List, Optional
class DataCollector:
"""فئة لجمع البيانات من APIs"""
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url
self.api_key = api_key
self.session = requests.Session()
if api_key:
self.session.headers.update({
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
})
def fetch_data(self, endpoint: str, params: Dict = None) -> Dict:
"""جلب البيانات من API"""
try:
url = f"{self.base_url}/{endpoint}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"خطأ في جلب البيانات: {e}")
return {}
def fetch_paginated_data(self, endpoint: str, page_size: int = 100) -> List[Dict]:
"""جلب بيانات متعددة الصفحات"""
all_data = []
page = 1
while True:
params = {'page': page, 'page_size': page_size}
data = self.fetch_data(endpoint, params)
if not data or 'results' not in data:
break
all_data.extend(data['results'])
if 'next' not in data or not data['next']:
break
page += 1
return all_data
def save_to_file(self, data: List[Dict], filename: str, format: str = 'json'):
"""حفظ البيانات إلى ملف"""
if format == 'json':
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
elif format == 'csv' and data:
keys = data[0].keys()
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=keys)
writer.writeheader()
writer.writerows(data)
print(f"تم حفظ البيانات في {filename}")
# 8. تعدد المهام (Multiprocessing)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def process_chunk(chunk: pd.DataFrame) -> pd.DataFrame:
"""معالجة جزء من البيانات"""
time.sleep(0.1) # محاكاة عملية معالجة
chunk['processed'] = True
return chunk
def parallel_processing(df: pd.DataFrame, num_workers: int = 4) -> pd.DataFrame:
"""معالجة متوازية للبيانات"""
# تقسيم البيانات
chunk_size = len(df) // num_workers + 1
chunks = [df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)]
# معالجة متوازية
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(process_chunk, chunks))
# دمج النتائج
return pd.concat(results, ignore_index=True)
# 9. تسجيل السجلات (Logging)
import logging
from logging.handlers import RotatingFileHandler
def setup_logger(name: str, log_file: str = 'data_pipeline.log') -> logging.Logger:
"""إعداد نظام تسجيل السجلات"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# إنشاء handlers
file_handler = RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)
console_handler = logging.StreamHandler()
# إنشاء formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# إضافة handlers
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# استخدام الـ logger
logger = setup_logger('data_engineer')
logger.info("بدء معالجة البيانات...")
try:
# معالجة البيانات
processed_data = parallel_processing(cleaned_data)
logger.info(f"تمت معالجة {len(processed_data)} سجل بنجاح")
except Exception as e:
logger.error(f"حدث خطأ في معالجة البيانات: {e}")
# 10. اختبار البيانات
import unittest
class TestDataPipeline(unittest.TestCase):
"""اختبار خط أنابيب البيانات"""
def setUp(self):
self.data = df.copy()
def test_data_cleaning(self):
"""اختبار تنقية البيانات"""
cleaned = clean_data(self.data)
self.assertFalse(cleaned.isnull().any().any())
self.assertEqual(len(cleaned), len(self.data))
def test_data_types(self):
"""اختبار أنواع البيانات"""
cleaned = clean_data(self.data)
self.assertEqual(cleaned['age'].dtype, int)
self.assertTrue(pd.api.types.is_datetime64_any_dtype(cleaned['join_date']))
def test_calculated_columns(self):
"""اختبار الأعمدة المحسوبة"""
cleaned = clean_data(self.data)
self.assertIn('age_group', cleaned.columns)
self.assertIn('years_of_service', cleaned.columns)
# تشغيل الاختبارات
if __name__ == '__main__':
unittest.main()
# 11. نماذج التعلم الآلي الأساسية
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
def build_salary_model(df: pd.DataFrame) -> Dict:
"""بناء نموذج للتنبؤ بالراتب"""
# تحضير البيانات
features = pd.get_dummies(df[['age', 'city']], columns=['city'], drop_first=True)
target = df['salary']
# تقسيم البيانات
X_train, X_test, y_train, y_test = train_test_split(
features, target, test_size=0.2, random_state=42
)
# بناء النموذج
model = LinearRegression()
model.fit(X_train, y_train)
# التنبؤ والتقييم
y_pred = model.predict(X_test)
results = {
'model': model,
'mse': mean_squared_error(y_test, y_pred),
'r2': r2_score(y_test, y_pred),
'coefficients': dict(zip(features.columns, model.coef_))
}
return results
# استخدام النموذج
if len(cleaned_data) > 1:
model_results = build_salary_model(cleaned_data)
print(f"\n=== نتائج نموذج التنبؤ بالراتب ===")
print(f"MSE: {model_results['mse']:.2f}")
print(f"R²: {model_results['r2']:.2f}")
print("المعاملات:", model_results['coefficients'])
# 12. الإنتاجية والتحسين
import timeit
import tracemalloc
def profile_function(func, *args, **kwargs):
"""قياس أداء الدالة"""
tracemalloc.start()
start_time = timeit.default_timer()
result = func(*args, **kwargs)
end_time = timeit.default_timer()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
return {
'result': result,
'time': end_time - start_time,
'memory_current': current / 10**6,
'memory_peak': peak / 10**6
}
# قياس أداء التنظيف
profile_result = profile_function(clean_data, df)
print(f"\n=== قياس الأداء ===")
print(f"الوقت: {profile_result['time']:.4f} ثانية")
print(f"الذاكرة الحالية: {profile_result['memory_current']:.2f} MB")
print(f"الذاكرة القصوى: {profile_result['memory_peak']:.2f} MB")
الخطوة 2: تعلم SQL
SQL للإدارة قواعد البيانات المحلية واستعلام البيانات من قواعد البيانات العلائقية
الأهمية:
لإدارة قواعد البيانات المحلية باستخدام Room Database
مثال استعلامات SQL متقدمة:
-- 1. إنشاء وتكوين قواعد البيانات
CREATE DATABASE IF NOT EXISTS ecommerce_db;
USE ecommerce_db;
-- 2. إنشاء الجداول
CREATE TABLE IF NOT EXISTS customers (
customer_id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
age INT CHECK (age >= 18),
city VARCHAR(50),
country VARCHAR(50) DEFAULT 'السعودية',
registration_date DATE DEFAULT (CURRENT_DATE),
total_spent DECIMAL(15,2) DEFAULT 0.00,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_city (city),
INDEX idx_registration_date (registration_date)
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE IF NOT EXISTS products (
product_id INT PRIMARY KEY AUTO_INCREMENT,
product_name VARCHAR(200) NOT NULL,
category VARCHAR(50),
subcategory VARCHAR(50),
brand VARCHAR(50),
price DECIMAL(10,2) NOT NULL CHECK (price > 0),
cost DECIMAL(10,2) NOT NULL,
stock_quantity INT DEFAULT 0 CHECK (stock_quantity >= 0),
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_category (category),
INDEX idx_brand (brand),
INDEX idx_price (price)
);
CREATE TABLE IF NOT EXISTS orders (
order_id INT PRIMARY KEY AUTO_INCREMENT,
customer_id INT NOT NULL,
order_date DATE NOT NULL,
order_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status ENUM('pending', 'processing', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
shipping_city VARCHAR(50),
shipping_country VARCHAR(50),
total_amount DECIMAL(15,2) NOT NULL,
discount_amount DECIMAL(10,2) DEFAULT 0.00,
net_amount DECIMAL(15,2) GENERATED ALWAYS AS (total_amount - discount_amount) STORED,
payment_method VARCHAR(20),
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (customer_id) REFERENCES customers(customer_id) ON DELETE CASCADE,
INDEX idx_order_date (order_date),
INDEX idx_status (status),
INDEX idx_customer_order (customer_id, order_date)
);
CREATE TABLE IF NOT EXISTS order_items (
order_item_id INT PRIMARY KEY AUTO_INCREMENT,
order_id INT NOT NULL,
product_id INT NOT NULL,
quantity INT NOT NULL CHECK (quantity > 0),
unit_price DECIMAL(10,2) NOT NULL,
total_price DECIMAL(15,2) GENERATED ALWAYS AS (quantity * unit_price) STORED,
discount_percent DECIMAL(5,2) DEFAULT 0.00,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (order_id) REFERENCES orders(order_id) ON DELETE CASCADE,
FOREIGN KEY (product_id) REFERENCES products(product_id) ON DELETE RESTRICT,
INDEX idx_order_product (order_id, product_id),
UNIQUE KEY unique_order_product (order_id, product_id)
);
-- 3. إدراج بيانات عينة
INSERT INTO customers (name, email, age, city, registration_date, total_spent) VALUES
('أحمد محمد', 'ahmed@example.com', 28, 'الرياض', '2023-01-15', 15000.00),
('سارة العتيبي', 'sara@example.com', 32, 'جدة', '2022-08-20', 25000.50),
('خالد السليم', 'khaled@example.com', 45, 'الرياض', '2021-11-05', 85000.75),
('نورة القحطاني', 'noura@example.com', 35, 'الدمام', '2023-03-10', 32000.25),
('محمد الحربي', 'mohammed@example.com', 29, 'مكة', '2022-05-30', 18000.00);
INSERT INTO products (product_name, category, subcategory, brand, price, cost, stock_quantity) VALUES
('لابتوب ديل', 'إلكترونيات', 'حواسيب', 'ديل', 3500.00, 2800.00, 50),
('هاتف سامسونج', 'إلكترونيات', 'هواتف', 'سامسونج', 2800.00, 2200.00, 100),
('تيشيرت قطني', 'ملابس', 'ملابس رجالية', 'نايكي', 150.00, 80.00, 200),
('عطر شانيل', 'عطور', 'عطور نسائية', 'شانيل', 500.00, 300.00, 30),
('كتاب تعلم Python', 'كتب', 'تقنية', 'دار النشر', 120.00, 70.00, 150);
INSERT INTO orders (customer_id, order_date, status, shipping_city, total_amount, discount_amount, payment_method) VALUES
(1, '2024-01-10', 'delivered', 'الرياض', 3650.00, 150.00, 'credit_card'),
(2, '2024-01-12', 'shipped', 'جدة', 2900.00, 100.00, 'paypal'),
(3, '2024-01-15', 'processing', 'الرياض', 1200.00, 50.00, 'bank_transfer'),
(1, '2024-01-18', 'pending', 'الرياض', 500.00, 0.00, 'credit_card'),
(4, '2024-01-20', 'delivered', 'الدمام', 1800.00, 100.00, 'paypal');
INSERT INTO order_items (order_id, product_id, quantity, unit_price, discount_percent) VALUES
(1, 1, 1, 3500.00, 5.00),
(1, 5, 2, 120.00, 0.00),
(2, 2, 1, 2800.00, 0.00),
(2, 3, 2, 150.00, 10.00),
(3, 4, 2, 500.00, 5.00),
(4, 5, 1, 120.00, 0.00),
(4, 3, 2, 150.00, 0.00),
(5, 2, 1, 2800.00, 0.00);
-- 4. استعلامات أساسية
-- استخراج العملاء فوق 30 سنة
SELECT customer_id, name, age, city, total_spent
FROM customers
WHERE age > 30
ORDER BY total_spent DESC;
-- عد العملاء حسب المدينة
SELECT
city,
COUNT(*) AS customer_count,
AVG(age) AS average_age,
SUM(total_spent) AS total_spending,
AVG(total_spent) AS avg_spending
FROM customers
GROUP BY city
HAVING COUNT(*) > 1
ORDER BY total_spending DESC;
-- 5. استعلامات متقدمة مع JOIN
SELECT
c.customer_id,
c.name,
c.city,
o.order_id,
o.order_date,
o.status,
o.total_amount,
o.net_amount,
oi.quantity,
p.product_name,
p.category
FROM customers c
INNER JOIN orders o ON c.customer_id = o.customer_id
INNER JOIN order_items oi ON o.order_id = oi.order_id
INNER JOIN products p ON oi.product_id = p.product_id
WHERE o.order_date >= '2024-01-01'
ORDER BY o.order_date DESC, o.total_amount DESC;
-- 6. الاستعلامات التجميعية المتقدمة
WITH customer_orders AS (
SELECT
c.customer_id,
c.name,
c.city,
COUNT(DISTINCT o.order_id) AS order_count,
SUM(o.total_amount) AS total_spent,
AVG(o.total_amount) AS avg_order_value,
MAX(o.order_date) AS last_order_date,
MIN(o.order_date) AS first_order_date
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.city
),
customer_segments AS (
SELECT
*,
CASE
WHEN total_spent >= 50000 THEN 'Platinum'
WHEN total_spent >= 20000 THEN 'Gold'
WHEN total_spent >= 5000 THEN 'Silver'
ELSE 'Bronze'
END AS customer_segment,
CASE
WHEN last_order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) THEN 'Active'
WHEN last_order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY) THEN 'At Risk'
ELSE 'Inactive'
END AS engagement_status
FROM customer_orders
)
SELECT
customer_segment,
engagement_status,
COUNT(*) AS customer_count,
AVG(total_spent) AS avg_total_spent,
AVG(order_count) AS avg_orders,
SUM(total_spent) AS segment_revenue
FROM customer_segments
GROUP BY customer_segment, engagement_status
ORDER BY segment_revenue DESC;
-- 7. وظائف النافذة (Window Functions)
SELECT
customer_id,
name,
city,
total_spent,
RANK() OVER (PARTITION BY city ORDER BY total_spent DESC) AS rank_in_city,
ROUND(PERCENT_RANK() OVER (ORDER BY total_spent) * 100, 2) AS percentile_rank,
SUM(total_spent) OVER (PARTITION BY city) AS city_total_spent,
AVG(total_spent) OVER (PARTITION BY city) AS city_avg_spent,
total_spent - AVG(total_spent) OVER (PARTITION BY city) AS diff_from_city_avg
FROM customers
WHERE city IS NOT NULL
ORDER BY city, total_spent DESC;
-- 8. الاستعلامات العودية (Recursive CTEs)
WITH RECURSIVE date_series AS (
SELECT
DATE('2024-01-01') AS date
UNION ALL
SELECT
date + INTERVAL 1 DAY
FROM date_series
WHERE date < '2024-01-31'
),
daily_sales AS (
SELECT
ds.date,
COALESCE(COUNT(DISTINCT o.order_id), 0) AS order_count,
COALESCE(SUM(o.total_amount), 0) AS daily_revenue,
COALESCE(SUM(o.net_amount), 0) AS daily_net_revenue
FROM date_series ds
LEFT JOIN orders o ON ds.date = o.order_date
GROUP BY ds.date
)
SELECT
date,
order_count,
daily_revenue,
daily_net_revenue,
SUM(daily_revenue) OVER (ORDER BY date) AS cumulative_revenue,
AVG(daily_revenue) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS weekly_avg_revenue,
LAG(daily_revenue, 1) OVER (ORDER BY date) AS prev_day_revenue,
ROUND(((daily_revenue - LAG(daily_revenue, 1) OVER (ORDER BY date)) /
LAG(daily_revenue, 1) OVER (ORDER BY date)) * 100, 2) AS growth_percentage
FROM daily_sales
ORDER BY date;
-- 9. الاستعلامات المعقدة مع Subqueries
SELECT
p.product_id,
p.product_name,
p.category,
p.price,
p.stock_quantity,
oi.total_quantity_sold,
oi.total_revenue,
oi.avg_order_quantity,
CASE
WHEN p.stock_quantity = 0 THEN 'نفذت الكمية'
WHEN p.stock_quantity <= 10 THEN 'كمية قليلة'
WHEN p.stock_quantity <= 50 THEN 'كمية متوسطة'
ELSE 'كمية جيدة'
END AS stock_status
FROM products p
LEFT JOIN (
SELECT
product_id,
SUM(quantity) AS total_quantity_sold,
SUM(quantity * unit_price) AS total_revenue,
AVG(quantity) AS avg_order_quantity
FROM order_items
GROUP BY product_id
) oi ON p.product_id = oi.product_id
WHERE p.is_active = TRUE
ORDER BY oi.total_revenue DESC NULLS LAST;
-- 10. إدارة المعاملات (Transactions)
START TRANSACTION;
-- تخفيض سعر المنتجات في فئة معينة
UPDATE products
SET price = price * 0.9 -- تخفيض 10%
WHERE category = 'إلكترونيات'
AND brand = 'سامسونج';
-- تحديث المخزون
UPDATE products p
JOIN (
SELECT
product_id,
SUM(quantity) AS total_ordered
FROM order_items oi
JOIN orders o ON oi.order_id = o.order_id
WHERE o.order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
GROUP BY product_id
) recent_sales ON p.product_id = recent_sales.product_id
SET p.stock_quantity = p.stock_quantity - recent_sales.total_ordered
WHERE p.stock_quantity >= recent_sales.total_ordered;
-- إضافة سجل مراجعة
INSERT INTO product_audit (product_id, action_type, old_price, new_price, changed_by, change_date)
SELECT
product_id,
'PRICE_UPDATE',
price / 0.9, -- السعر القديم
price, -- السعر الجديد
CURRENT_USER(),
CURRENT_TIMESTAMP()
FROM products
WHERE category = 'إلكترونيات'
AND brand = 'سامسونج';
COMMIT;
-- 11. إنشاء Views و Stored Procedures
CREATE OR REPLACE VIEW customer_summary_view AS
SELECT
c.customer_id,
c.name,
c.email,
c.city,
c.country,
c.registration_date,
c.total_spent,
COALESCE(o.order_count, 0) AS order_count,
COALESCE(o.last_order_date, c.registration_date) AS last_order_date,
CASE
WHEN o.last_order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) THEN 'نشط'
WHEN o.last_order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 90 DAY) THEN 'شبه نشط'
ELSE 'غير نشط'
END AS activity_status
FROM customers c
LEFT JOIN (
SELECT
customer_id,
COUNT(*) AS order_count,
MAX(order_date) AS last_order_date
FROM orders
GROUP BY customer_id
) o ON c.customer_id = o.customer_id;
-- Stored Procedure لحساب إحصائيات العملاء
DELIMITER //
CREATE PROCEDURE CalculateCustomerStats(
IN p_city VARCHAR(50),
OUT p_total_customers INT,
OUT p_avg_spending DECIMAL(15,2),
OUT p_total_revenue DECIMAL(15,2)
)
BEGIN
-- حساب عدد العملاء
SELECT COUNT(*), AVG(total_spent), SUM(total_spent)
INTO p_total_customers, p_avg_spending, p_total_revenue
FROM customers
WHERE (p_city IS NULL OR city = p_city);
-- تسجيل العملية
INSERT INTO stats_log (operation_type, parameters, result_count, executed_by)
VALUES ('CALCULATE_CUSTOMER_STATS', p_city, p_total_customers, CURRENT_USER());
END //
DELIMITER ;
-- استدعاء الـ Stored Procedure
CALL CalculateCustomerStats('الرياض', @total, @avg, @revenue);
SELECT @total AS total_customers, @avg AS avg_spending, @revenue AS total_revenue;
-- 12. التحسين والـ Explain Plans
EXPLAIN ANALYZE
SELECT
c.customer_id,
c.name,
o.order_id,
o.order_date,
o.total_amount,
oi.quantity,
p.product_name
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
WHERE c.city = 'الرياض'
AND o.order_date >= '2024-01-01'
ORDER BY o.total_amount DESC
LIMIT 100;
-- 13. النسخ الاحتياطي والاستعادة
-- نسخ احتياطي للبيانات
SELECT * INTO OUTFILE '/tmp/customers_backup.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
FROM customers;
-- استعادة البيانات
LOAD DATA INFILE '/tmp/customers_backup.csv'
INTO TABLE customers
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n';
-- 14. مراقبة الأداء
SHOW PROCESSLIST;
SHOW STATUS LIKE 'Threads_connected';
SHOW ENGINE INNODB STATUS;
-- 15. تنظيف البيانات
-- إزالة العملاء الذين لم يقوموا بأي طلب منذ أكثر من سنة
DELETE FROM customers
WHERE customer_id IN (
SELECT c.customer_id
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id
HAVING MAX(o.order_date) < DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR)
OR MAX(o.order_date) IS NULL
);
-- تحديث الإحصائيات
ANALYZE TABLE customers, orders, products, order_items;
OPTIMIZE TABLE customers, orders, products, order_items;
الخطوة 3: تعلم Apache Spark
Apache Spark لمعالجة كميات ضخمة من البيانات بسرعة وكفاءة. يدعم معالجة البيانات الموزعة باستخدام RDDs (Resilient Distributed Datasets)
الأهمية:
ضروري لمعالجة البيانات الضخمة بشكل سريع وفعال
الأدوات:
Spark Python (واجهة PySpark)
مثال عملي معالجة بيانات باستخدام PySpark:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F
# 1. إنشاء جلسة Spark
spark = SparkSession.builder \
.appName("Data Engineering Pipeline") \
.config("spark.sql.shuffle.partitions", "100") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# 2. تحميل البيانات من مصادر مختلفة
# من ملفات CSV
customers_df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("delimiter", ",") \
.csv("hdfs://localhost:9000/data/customers/*.csv")
# من قاعدة بيانات JDBC
jdbc_url = "jdbc:mysql://localhost:3306/ecommerce_db"
properties = {
"user": "root",
"password": "password",
"driver": "com.mysql.jdbc.Driver"
}
orders_df = spark.read \
.jdbc(url=jdbc_url, table="orders", properties=properties)
# من Hive
products_df = spark.sql("SELECT * FROM ecommerce_db.products")
# 3. تحويلات ETL الأساسية
class ETLPipeline:
"""فئة لبناء خط أنابيب ETL"""
def __init__(self, spark_session):
self.spark = spark_session
def extract_data(self, source_config):
"""استخراج البيانات من مصادر مختلفة"""
data_sources = []
# من ملفات
if 'csv_files' in source_config:
df = self.spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(source_config['csv_files'])
data_sources.append(df)
# من قاعدة بيانات
if 'jdbc' in source_config:
df = self.spark.read \
.jdbc(url=source_config['jdbc']['url'],
table=source_config['jdbc']['table'],
properties=source_config['jdbc']['properties'])
data_sources.append(df)
# من Kafka
if 'kafka' in source_config:
df = self.spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", source_config['kafka']['bootstrap_servers']) \
.option("subscribe", source_config['kafka']['topic']) \
.load()
data_sources.append(df)
return data_sources
def transform_data(self, df, transformations):
"""تحويل البيانات"""
transformed_df = df
for transform in transformations:
if transform['type'] == 'filter':
transformed_df = transformed_df.filter(transform['condition'])
elif transform['type'] == 'select':
transformed_df = transformed_df.select(transform['columns'])
elif transform['type'] == 'group_by':
transformed_df = transformed_df.groupBy(transform['columns']) \
.agg(*transform['aggregations'])
elif transform['type'] == 'join':
transformed_df = transformed_df.join(
transform['other_df'],
transform['join_condition'],
transform['join_type']
)
elif transform['type'] == 'window_function':
window_spec = Window.partitionBy(transform['partition_cols']) \
.orderBy(transform['order_cols'])
for col_name, func in transform['functions'].items():
transformed_df = transformed_df.withColumn(
col_name, func().over(window_spec)
)
return transformed_df
def load_data(self, df, destination_config):
"""تحميل البيانات إلى وجهات مختلفة"""
# إلى HDFS
if destination_config.get('hdfs'):
df.write \
.mode(destination_config['hdfs']['mode']) \
.option("header", "true") \
.csv(destination_config['hdfs']['path'])
# إلى قاعدة بيانات
if destination_config.get('jdbc'):
df.write \
.mode(destination_config['jdbc']['mode']) \
.jdbc(url=destination_config['jdbc']['url'],
table=destination_config['jdbc']['table'],
properties=destination_config['jdbc']['properties'])
# إلى Hive
if destination_config.get('hive'):
df.write \
.mode(destination_config['hive']['mode']) \
.saveAsTable(destination_config['hive']['table_name'])
# إلى Kafka
if destination_config.get('kafka'):
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", destination_config['kafka']['bootstrap_servers']) \
.option("topic", destination_config['kafka']['topic']) \
.save()
def run_pipeline(self, source_config, transformations, destination_config):
"""تشغيل خط الأنابيب الكامل"""
print("بدء خط أنابيب ETL...")
# استخراج
data_sources = self.extract_data(source_config)
print(f"تم استخراج {len(data_sources)} مصدر بيانات")
# تحويل
for i, df in enumerate(data_sources):
print(f"تحويل مصدر البيانات {i+1}...")
transformed_df = self.transform_data(df, transformations)
# تحميل
print(f"تحميل مصدر البيانات {i+1}...")
self.load_data(transformed_df, destination_config)
print("اكتمل خط أنابيب ETL!")
# 4. مثال عملي لخط أنابيب ETL
# تكوين المصادر
source_config = {
'csv_files': 'hdfs://localhost:9000/data/raw/*.csv',
'jdbc': {
'url': 'jdbc:mysql://localhost:3306/ecommerce_db',
'table': 'orders',
'properties': {
'user': 'root',
'password': 'password',
'driver': 'com.mysql.jdbc.Driver'
}
}
}
# تكوين التحويلات
transformations = [
{
'type': 'filter',
'condition': col('total_amount') > 1000
},
{
'type': 'select',
'columns': ['customer_id', 'order_date', 'total_amount', 'status']
},
{
'type': 'group_by',
'columns': ['customer_id'],
'aggregations': [
count('*').alias('order_count'),
sum('total_amount').alias('total_spent'),
avg('total_amount').alias('avg_order_value')
]
}
]
# تكوين الوجهات
destination_config = {
'hdfs': {
'path': 'hdfs://localhost:9000/data/processed/customer_summary',
'mode': 'overwrite'
},
'jdbc': {
'url': 'jdbc:mysql://localhost:3306/ecommerce_db',
'table': 'customer_summary',
'properties': {
'user': 'root',
'password': 'password',
'driver': 'com.mysql.jdbc.Driver'
},
'mode': 'overwrite'
}
}
# تشغيل خط الأنابيب
etl_pipeline = ETLPipeline(spark)
etl_pipeline.run_pipeline(source_config, transformations, destination_config)
# 5. معالجة البيانات الموزعة (Distributed Processing)
class DistributedDataProcessor:
"""معالج بيانات موزع"""
def __init__(self, spark_session):
self.spark = spark_session
def process_large_dataset(self, input_path, output_path):
"""معالجة مجموعة بيانات كبيرة"""
# قراءة البيانات
df = self.spark.read.parquet(input_path)
# إعادة التقسيم الأمثل
optimal_partitions = max(1, df.rdd.getNumPartitions() // 10)
df = df.repartition(optimal_partitions, "customer_id")
# معالجة متقدمة
processed_df = df.transform(self._apply_transformations)
# التحسينات
processed_df.cache()
processed_df.count() # تفعيل التخزين المؤقت
# حفظ النتائج
processed_df.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet(output_path)
return processed_df
def _apply_transformations(self, df):
"""تطبيق التحويلات على البيانات"""
# تنظيف البيانات
df = df.dropna(subset=["customer_id", "order_date"])
# إضافة أعمدة محسوبة
df = df.withColumn("order_year", year(col("order_date"))) \
.withColumn("order_month", month(col("order_date"))) \
.withColumn("order_quarter", quarter(col("order_date"))) \
.withColumn("order_day", dayofmonth(col("order_date"))) \
.withColumn("order_day_of_week", dayofweek(col("order_date")))
# تجميع البيانات
result = df.groupBy("customer_id", "order_year", "order_month") \
.agg(
count("*").alias("monthly_order_count"),
sum("total_amount").alias("monthly_spent"),
avg("total_amount").alias("avg_order_value"),
max("order_date").alias("last_order_date"),
min("order_date").alias("first_order_date"),
collect_list("order_id").alias("order_ids")
)
return result
def calculate_customer_lifetime_value(self, df):
"""حساب قيمة العميل مدى الحياة (CLV)"""
# تعريف نافذة زمنية
window_spec = Window.partitionBy("customer_id").orderBy("order_date")
clv_df = df.withColumn("order_sequence", row_number().over(window_spec)) \
.withColumn("days_between_orders",
datediff(col("order_date"),
lag("order_date", 1).over(window_spec))) \
.withColumn("cumulative_spent",
sum("total_amount").over(window_spec.rowsBetween(Window.unboundedPreceding, 0))) \
.withColumn("avg_order_frequency",
avg("days_between_orders").over(Window.partitionBy("customer_id"))) \
.withColumn("predicted_future_orders",
when(col("avg_order_frequency").isNotNull(),
365 / col("avg_order_frequency")).otherwise(0)) \
.withColumn("predicted_future_value",
col("predicted_future_orders") * avg("total_amount").over(Window.partitionBy("customer_id")))
return clv_df
# 6. معالجة البيانات في الوقت الفعلي (Stream Processing)
from pyspark.sql.streaming import StreamingQuery
class RealTimeDataProcessor:
"""معالج بيانات في الوقت الفعلي"""
def __init__(self, spark_session):
self.spark = spark_session
def create_kafka_stream(self, bootstrap_servers, topics):
"""إنشاء تدفق من Kafka"""
stream_df = self.spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("subscribe", topics) \
.option("startingOffsets", "latest") \
.load()
# تحويل البيانات
from pyspark.sql.functions import from_json
schema = self._get_order_schema()
parsed_stream = stream_df \
.select(from_json(col("value").cast("string"), schema).alias("data")) \
.select("data.*")
return parsed_stream
def _get_order_schema(self):
"""الحصول على مخطط بيانات الطلبات"""
return StructType([
StructField("order_id", StringType()),
StructField("customer_id", StringType()),
StructField("order_timestamp", TimestampType()),
StructField("total_amount", DoubleType()),
StructField("items", ArrayType(
StructType([
StructField("product_id", StringType()),
StructField("quantity", IntegerType()),
StructField("price", DoubleType())
])
))
])
def process_real_time_orders(self, stream_df):
"""معالجة الطلبات في الوقت الفعلي"""
# معالجة التدفق
processed_stream = stream_df \
.withWatermark("order_timestamp", "5 minutes") \
.groupBy(
window("order_timestamp", "1 hour"),
"customer_id"
).agg(
count("*").alias("hourly_orders"),
sum("total_amount").alias("hourly_spent"),
avg("total_amount").alias("avg_order_value")
)
return processed_stream
def write_stream_to_sink(self, stream_df, sink_type, **kwargs):
"""كتابة التدفق إلى وجهة"""
if sink_type == "console":
query = stream_df.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.start()
elif sink_type == "kafka":
query = stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kwargs["bootstrap_servers"]) \
.option("topic", kwargs["topic"]) \
.option("checkpointLocation", kwargs["checkpoint_location"]) \
.start()
elif sink_type == "parquet":
query = stream_df.writeStream \
.format("parquet") \
.option("path", kwargs["path"]) \
.option("checkpointLocation", kwargs["checkpoint_location"]) \
.partitionBy(kwargs.get("partition_by", [])) \
.start()
return query
# 7. التحليل والتحسين
class SparkOptimizer:
"""محسن أداء Spark"""
def __init__(self, spark_session):
self.spark = spark_session
def analyze_query_plan(self, df):
"""تحليل خطة الاستعلام"""
df.explain(extended=True)
# عرض إحصائيات التحسين
print("=== إحصائيات التحسين ===")
print(f"عدد الأقسام: {df.rdd.getNumPartitions()}")
print(f"تقدير عدد الصفوف: {df.count()}")
return df
def optimize_dataframe(self, df):
"""تحسين DataFrame"""
# التحقق من عدد الأقسام
current_partitions = df.rdd.getNumPartitions()
if current_partitions > 200:
print(f"تقليل الأقسام من {current_partitions} إلى 200")
df = df.coalesce(200)
# التحقق من حجم البيانات
df.cache()
row_count = df.count()
print(f"عدد الصفوف: {row_count}")
if row_count > 1000000:
print("تطبيق Partitioning على البيانات الكبيرة")
df = df.repartition(200, "customer_id")
return df
def monitor_performance(self, df, operation_name):
"""مراقبة أداء العملية"""
import time
start_time = time.time()
start_memory = self.spark.sparkContext.getExecutorMemoryStatus()
# تنفيذ العملية
result = df.collect()
end_time = time.time()
end_memory = self.spark.sparkContext.getExecutorMemoryStatus()
# حساب المقاييس
execution_time = end_time - start_time
memory_used = sum(end_memory.values()) - sum(start_memory.values())
print(f"=== أداء {operation_name} ===")
print(f"الوقت: {execution_time:.2f} ثانية")
print(f"الذاكرة المستخدمة: {memory_used / (1024**2):.2f} MB")
print(f"عدد الصفوف: {len(result)}")
return result
# 8. التكامل مع أنظمة أخرى
class DataPipelineIntegrator:
"""مكامل خطوط أنابيب البيانات"""
def __init__(self, spark_session):
self.spark = spark_session
def integrate_with_airflow(self, dag_id, task_id, config):
"""التكامل مع Apache Airflow"""
# إنشاء مهمة Airflow
from airflow.operators.python import PythonOperator
def spark_task(**kwargs):
# تكوين Spark
spark = SparkSession.builder \
.appName(f"{dag_id}_{task_id}") \
.config("spark.sql.shuffle.partitions", "100") \
.getOrCreate()
# تنفيذ المهمة
result = self._execute_spark_job(spark, config)
# إرجاع النتيجة إلى Airflow
kwargs['ti'].xcom_push(key='spark_result', value=result)
return result
return PythonOperator(
task_id=task_id,
python_callable=spark_task,
dag=kwargs.get('dag')
)
def _execute_spark_job(self, spark, config):
"""تنفيذ مهمة Spark"""
# قراءة البيانات
df = spark.read.format(config['input_format']) \
.options(**config['input_options']) \
.load(config['input_path'])
# تطبيق التحويلات
for transform in config['transformations']:
df = df.transform(transform)
# حفظ النتائج
df.write.format(config['output_format']) \
.options(**config['output_options']) \
.mode(config['output_mode']) \
.save(config['output_path'])
return df.count()
def integrate_with_kafka(self, bootstrap_servers, topic, schema):
"""التكامل مع Kafka"""
# قراءة من Kafka
df = self.spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("subscribe", topic) \
.load()
# تحويل البيانات
from pyspark.sql.functions import from_json
parsed_df = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
return parsed_df
# 9. اختبار خطوط الأنابيب
import unittest
from pyspark.testing import assertDataFrameEqual
class TestETLPipeline(unittest.TestCase):
"""اختبار خط أنابيب ETL"""
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder \
.appName("ETL Pipeline Tests") \
.master("local[2]") \
.getOrCreate()
def test_data_extraction(self):
"""اختبار استخراج البيانات"""
# إنشاء بيانات تجريبية
test_data = [
(1, "أحمد", 28, "الرياض", 15000.0),
(2, "سارة", 32, "جدة", 25000.5)
]
schema = ["customer_id", "name", "age", "city", "total_spent"]
expected_df = self.spark.createDataFrame(test_data, schema)
# حفظ البيانات التجريبية
expected_df.write.mode("overwrite").csv("test_data.csv")
# قراءة البيانات
actual_df = self.spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("test_data.csv")
# التحقق
assertDataFrameEqual(actual_df, expected_df)
def test_data_transformation(self):
"""اختبار تحويل البيانات"""
# بيانات الإدخال
input_data = [(1, 28, 15000.0), (2, 32, 25000.5)]
input_df = self.spark.createDataFrame(input_data, ["id", "age", "spent"])
# التحويل المتوقع
expected_data = [(1, 28, 15000.0, "شاب"), (2, 32, 25000.5, "شباب")]
expected_df = self.spark.createDataFrame(
expected_data,
["id", "age", "spent", "age_group"]
)
# تطبيق التحويل
actual_df = input_df.withColumn(
"age_group",
when(col("age") < 30, "شاب")
.when(col("age").between(30, 40), "شباب")
.otherwise("كبير")
)
# التحقق
assertDataFrameEqual(actual_df, expected_df)
def test_data_aggregation(self):
"""اختبار تجميع البيانات"""
# بيانات الإدخال
input_data = [
(1, "الرياض", 15000.0),
(2, "الرياض", 25000.5),
(3, "جدة", 18000.0)
]
input_df = self.spark.createDataFrame(
input_data, ["id", "city", "spent"]
)
# التجميع المتوقع
expected_data = [
("الرياض", 2, 20000.25, 40000.5),
("جدة", 1, 18000.0, 18000.0)
]
expected_df = self.spark.createDataFrame(
expected_data,
["city", "customer_count", "avg_spent", "total_spent"]
)
# تطبيق التجميع
actual_df = input_df.groupBy("city").agg(
count("*").alias("customer_count"),
avg("spent").alias("avg_spent"),
sum("spent").alias("total_spent")
)
# التحقق
assertDataFrameEqual(actual_df, expected_df)
# 10. إيقاف جلسة Spark
spark.stop()
الخطوة 4: تعلم Hadoop
Hadoop هو إطار عمل لتخزين ومعالجة البيانات الضخمة باستخدام MapReduce و HDFS (Hadoop Distributed File System)
الأهمية:
الأساس لتخزين البيانات الضخمة عبر أنظمة متعددة
الأدوات:
Hortonworks Sandbox أو Hadoop CLI
مثال عملي تخزين بيانات باستخدام HDFS:
#!/bin/bash # 1. إعداد وتكوين Hadoop # تثبيت Hadoop (لـ Ubuntu/Debian) sudo apt-get update sudo apt-get install -y openjdk-11-jdk wget https://downloads.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz tar -xzf hadoop-3.3.6.tar.gz sudo mv hadoop-3.3.6 /usr/local/hadoop # إعداد متغيرات البيئة echo 'export HADOOP_HOME=/usr/local/hadoop' >> ~/.bashrc echo 'export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin' >> ~/.bashrc echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64' >> ~/.bashrc source ~/.bashrc # 2. تكوين Hadoop # ملف core-site.xml cat > $HADOOP_HOME/etc/hadoop/core-site.xml << 'EOF'EOF # ملف hdfs-site.xml cat > $HADOOP_HOME/etc/hadoop/hdfs-site.xml << 'EOF' fs.defaultFS hdfs://localhost:9000 hadoop.tmp.dir /tmp/hadoop EOF # ملف mapred-site.xml cat > $HADOOP_HOME/etc/hadoop/mapred-site.xml << 'EOF' dfs.replication 1 dfs.namenode.name.dir /tmp/hadoop/namenode dfs.datanode.data.dir /tmp/hadoop/datanode dfs.permissions false EOF # ملف yarn-site.xml cat > $HADOOP_HOME/etc/hadoop/yarn-site.xml << 'EOF' mapreduce.framework.name yarn mapreduce.jobhistory.address localhost:10020 mapreduce.jobhistory.webapp.address localhost:19888 EOF # 3. تشغيل Hadoop echo "تهيئة HDFS..." hdfs namenode -format echo "بدء خدمات Hadoop..." start-dfs.sh start-yarn.sh echo "التحقق من حالة Hadoop..." jps # 4. أوامر HDFS الأساسية echo "=== أوامر HDFS الأساسية ===" # إنشاء مجلدات echo "إنشاء مجلدات في HDFS..." hdfs dfs -mkdir -p /data/raw hdfs dfs -mkdir -p /data/processed hdfs dfs -mkdir -p /data/backup hdfs dfs -mkdir -p /logs # عرض هيكل المجلدات echo "عرض هيكل HDFS..." hdfs dfs -ls -R / # 5. تحميل البيانات إلى HDFS echo "=== تحميل البيانات إلى HDFS ===" # إنشاء بيانات تجريبية echo "إنشاء بيانات تجريبية..." cat > customers.csv << 'EOF' customer_id,name,email,age,city,total_spent 1,أحمد محمد,ahmed@example.com,28,الرياض,15000.00 2,سارة العتيبي,sara@example.com,32,جدة,25000.50 3,خالد السليم,khaled@example.com,45,الرياض,85000.75 4,نورة القحطاني,noura@example.com,35,الدمام,32000.25 5,محمد الحربي,mohammed@example.com,29,مكة,18000.00 EOF cat > orders.csv << 'EOF' order_id,customer_id,order_date,total_amount,status 1001,1,2024-01-10,3650.00,delivered 1002,2,2024-01-12,2900.00,shipped 1003,3,2024-01-15,1200.00,processing 1004,1,2024-01-18,500.00,pending 1005,4,2024-01-20,1800.00,delivered EOF # تحميل الملفات إلى HDFS echo "تحميل customers.csv إلى HDFS..." hdfs dfs -put customers.csv /data/raw/ echo "تحميل orders.csv إلى HDFS..." hdfs dfs -put orders.csv /data/raw/ # التحقق من الملفات المحملة echo "التحقق من الملفات المحملة..." hdfs dfs -ls /data/raw/ # 6. أوامر HDFS المتقدمة echo "=== أوامر HDFS المتقدمة ===" # عرض محتوى الملفات echo "عرض محتوى customers.csv..." hdfs dfs -cat /data/raw/customers.csv # عرض أول 3 سطور echo "عرض أول 3 سطور من orders.csv..." hdfs dfs -head -3 /data/raw/orders.csv # عرض آخر 3 سطور echo "عرض آخر 3 سطور من orders.csv..." hdfs dfs -tail -3 /data/raw/orders.csv # حجم الملفات echo "حجم الملفات في HDFS..." hdfs dfs -du -h /data/raw/ # المساحة الإجمالية echo "المساحة الإجمالية في HDFS..." hdfs dfs -df -h # 7. نسخ ونقل الملفات echo "=== نسخ ونقل الملفات ===" # نسخ من النظام المحلي إلى HDFS echo "نسخ ملف محلي إلى HDFS..." hdfs dfs -copyFromLocal customers.csv /data/backup/customers_backup.csv # نسخ من HDFS إلى النظام المحلي echo "نسخ من HDFS إلى النظام المحلي..." hdfs dfs -copyToLocal /data/raw/orders.csv ./orders_local.csv # نقل ملفات داخل HDFS echo "نقل ملف داخل HDFS..." hdfs dfs -mv /data/backup/customers_backup.csv /data/processed/ # 8. إدارة الأذونات echo "=== إدارة أذونات الملفات ===" # تغيير مالك الملف echo "تغيير مالك الملف..." hdfs dfs -chown hadoop:hadoop /data/raw/customers.csv # تغيير الصلاحيات echo "تغيير صلاحيات الملف..." hdfs dfs -chmod 755 /data/raw/customers.csv # عرض الأذونات echo "عرض أذونات الملفات..." hdfs dfs -ls -h /data/raw/ # 9. إدارة الحصص (Quotas) echo "=== إدارة حصص التخزين ===" # تعيين حصة للمجلد echo "تعيين حصة تخزين..." hdfs dfsadmin -setSpaceQuota 1G /data/raw # عرض الحصة echo "عرض حصة التخزين..." hdfs dfs -count -q /data/raw # إزالة الحصة echo "إزالة حصة التخزين..." hdfs dfsadmin -clrSpaceQuota /data/raw # 10. فحص وصيانة HDFS echo "=== فحص وصيانة HDFS ===" # فحص نظام الملفات echo "فحص نظام الملفات HDFS..." hdfs fsck / -files -blocks -locations # عرض تقرير HDFS echo "عرض تقرير HDFS..." hdfs dfsadmin -report # عرض حالة NameNode echo "عرض حالة NameNode..." hdfs dfsadmin -printTopology # 11. MapReduce مثال عملي echo "=== تشغيل برنامج MapReduce ===" # إنشاء برنامج MapReduce بسيط (WordCount) cat > WordCount.java << 'EOF' import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper yarn.nodemanager.aux-services mapreduce_shuffle yarn.nodemanager.aux-services.mapreduce.shuffle.class org.apache.hadoop.mapred.ShuffleHandler yarn.resourcemanager.hostname localhost
الخطوة 5: تعلم Airflow
Airflow لأتمتة وإدارة سير العمل المتعلقة ببياناتك
الأهمية:
ضروري لأتمتة العمليات اليومية مثل استخراج البيانات وتحويلها (ETL Pipelines) وتحميلها
الأدوات:
Airflow Web Interface
مثال عملي Apache Airflow:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.apache.kafka.operators.kafka import KafkaProducerOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import json
import pandas as pd
import requests
# 1. تعريف DAG الأساسي
default_args = {
'owner': 'data_engineer',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['data_team@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
# 2. إنشاء DAG
dag = DAG(
'data_engineering_pipeline',
default_args=default_args,
description='خط أنابيب هندسة البيانات الكامل',
schedule_interval='0 2 * * *', # تشغيل يومياً في الساعة 2 صباحاً
catchup=False,
tags=['etl', 'data_pipeline', 'analytics'],
)
# 3. مهام البداية والنهاية
start_pipeline = DummyOperator(
task_id='start_pipeline',
dag=dag,
)
end_pipeline = DummyOperator(
task_id='end_pipeline',
dag=dag,
)
# 4. مهمة التحقق من توفر البيانات
check_data_availability = HttpSensor(
task_id='check_data_availability',
http_conn_id='data_api',
endpoint='/api/health',
response_check=lambda response: response.status_code == 200,
timeout=300,
poke_interval=30,
mode='reschedule',
dag=dag,
)
# 5. مهمة استخراج البيانات
def extract_data(**context):
"""استخراج البيانات من مصادر مختلفة"""
execution_date = context['execution_date']
# استخراج من API
api_url = "https://api.example.com/data"
response = requests.get(api_url)
api_data = response.json()
# استخراج من قاعدة بيانات
import psycopg2
conn = psycopg2.connect(
host="localhost",
database="source_db",
user="user",
password="password"
)
query = """
SELECT * FROM sales
WHERE sale_date = %s
"""
db_data = pd.read_sql(query, conn, params=[execution_date])
conn.close()
# حفظ البيانات المستخرجة
api_data.to_json(f'/tmp/api_data_{execution_date}.json')
db_data.to_csv(f'/tmp/db_data_{execution_date}.csv', index=False)
return {
'api_records': len(api_data),
'db_records': len(db_data)
}
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
provide_context=True,
dag=dag,
)
# 6. مهمة تحويل البيانات
def transform_data(**context):
"""تحويل وتنظيف البيانات"""
execution_date = context['execution_date']
ti = context['ti']
extraction_result = ti.xcom_pull(task_ids='extract_data')
# قراءة البيانات المستخرجة
api_data = pd.read_json(f'/tmp/api_data_{execution_date}.json')
db_data = pd.read_csv(f'/tmp/db_data_{execution_date}.csv')
# تحويلات API البيانات
api_transformed = api_data.copy()
api_transformed['processed_date'] = execution_date
api_transformed['data_source'] = 'api'
# تنظيف البيانات
api_transformed = api_transformed.dropna(subset=['customer_id', 'amount'])
api_transformed['amount'] = pd.to_numeric(api_transformed['amount'], errors='coerce')
# تحويلات قاعدة البيانات
db_transformed = db_data.copy()
db_transformed['processed_date'] = execution_date
db_transformed['data_source'] = 'database'
# دمج البيانات
merged_data = pd.concat([api_transformed, db_transformed], ignore_index=True)
# تجميع البيانات
aggregated = merged_data.groupby(['customer_id', 'data_source']).agg({
'amount': ['sum', 'mean', 'count'],
'processed_date': 'max'
}).reset_index()
# حفظ البيانات المحولة
merged_data.to_csv(f'/tmp/merged_data_{execution_date}.csv', index=False)
aggregated.to_csv(f'/tmp/aggregated_data_{execution_date}.csv', index=False)
return {
'total_records': len(merged_data),
'aggregated_records': len(aggregated)
}
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
provide_context=True,
dag=dag,
)
# 7. مهمة تحميل البيانات إلى Data Warehouse
load_to_warehouse = PostgresOperator(
task_id='load_to_warehouse',
postgres_conn_id='warehouse_db',
sql='''
CREATE TABLE IF NOT EXISTS daily_sales (
customer_id INTEGER,
data_source VARCHAR(50),
total_amount DECIMAL(15,2),
avg_amount DECIMAL(10,2),
transaction_count INTEGER,
processed_date DATE,
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
COPY daily_sales(customer_id, data_source, total_amount,
avg_amount, transaction_count, processed_date)
FROM '/tmp/aggregated_data_{{ ds }}.csv'
DELIMITER ','
CSV HEADER;
''',
dag=dag,
)
# 8. مهمة معالجة البيانات بـ Spark
process_with_spark = SparkSubmitOperator(
task_id='process_with_spark',
application='/opt/airflow/dags/spark_jobs/data_processing.py',
conn_id='spark_default',
application_args=[
'--input-path', '/tmp/merged_data_{{ ds }}.csv',
'--output-path', '/data/processed/{{ ds }}',
'--execution-date', '{{ ds }}'
],
jars='/opt/airflow/jars/spark-avro_2.12-3.3.0.jar',
total_executor_cores=4,
executor_cores=2,
executor_memory='4g',
driver_memory='2g',
verbose=True,
dag=dag,
)
# 9. مهمة إرسال البيانات إلى Kafka
send_to_kafka = KafkaProducerOperator(
task_id='send_to_kafka',
kafka_config_id='kafka_default',
topic='processed_data',
producer_function='airflow.providers.apache.kafka.producer.json_producer',
producer_function_args=[
'processed_data_topic',
{'data': '/tmp/aggregated_data_{{ ds }}.csv', 'date': '{{ ds }}'}
],
dag=dag,
)
# 10. مهمة مراقبة جودة البيانات
def data_quality_check(**context):
"""فحص جودة البيانات"""
execution_date = context['execution_date']
# قراءة البيانات المحولة
data = pd.read_csv(f'/tmp/merged_data_{execution_date}.csv')
checks = {}
# فحص القيم المفقودة
checks['missing_values'] = data.isnull().sum().sum()
# فحص القيم المتطرفة
numeric_cols = data.select_dtypes(include=['float64', 'int64']).columns
for col in numeric_cols:
q1 = data[col].quantile(0.25)
q3 = data[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outliers = data[(data[col] < lower_bound) | (data[col] > upper_bound)][col].count()
checks[f'outliers_{col}'] = outliers
# فحص التكرارات
checks['duplicates'] = data.duplicated().sum()
# فحص صحة البيانات
checks['negative_amounts'] = data[data['amount'] < 0]['amount'].count() if 'amount' in data.columns else 0
# حفظ نتائج الفحص
with open(f'/tmp/data_quality_{execution_date}.json', 'w') as f:
json.dump(checks, f, indent=2)
# التحقق من معايير الجودة
quality_issues = []
if checks['missing_values'] > 100:
quality_issues.append(f"عدد كبير من القيم المفقودة: {checks['missing_values']}")
if checks['duplicates'] > 50:
quality_issues.append(f"عدد كبير من التكرارات: {checks['duplicates']}")
if quality_issues:
raise ValueError(f"مشاكل جودة البيانات: {', '.join(quality_issues)}")
return checks
quality_check = PythonOperator(
task_id='data_quality_check',
python_callable=data_quality_check,
provide_context=True,
dag=dag,
)
# 11. مهمة إنشاء التقارير
def generate_reports(**context):
"""إنشاء تقارير البيانات"""
execution_date = context['execution_date']
# قراءة البيانات
data = pd.read_csv(f'/tmp/aggregated_data_{execution_date}.csv')
# تقرير المبيعات اليومية
daily_sales_report = data.groupby('data_source').agg({
'total_amount': 'sum',
'transaction_count': 'sum',
'avg_amount': 'mean'
}).reset_index()
daily_sales_report.to_csv(f'/reports/daily_sales_{execution_date}.csv', index=False)
# تقرير العملاء
customer_summary = data.groupby('customer_id').agg({
'total_amount': 'sum',
'transaction_count': 'sum',
'avg_amount': 'mean'
}).reset_index()
customer_summary.to_csv(f'/reports/customer_summary_{execution_date}.csv', index=False)
# تقرير PDF
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
pdf_path = f'/reports/summary_report_{execution_date}.pdf'
c = canvas.Canvas(pdf_path, pagesize=letter)
c.drawString(100, 750, f"تقرير المبيعات - {execution_date}")
c.drawString(100, 730, f"إجمالي المبيعات: {daily_sales_report['total_amount'].sum():,.2f}")
c.drawString(100, 710, f"عدد المعاملات: {daily_sales_report['transaction_count'].sum():,.0f}")
c.save()
return {
'sales_report': f'/reports/daily_sales_{execution_date}.csv',
'customer_report': f'/reports/customer_summary_{execution_date}.csv',
'pdf_report': pdf_path
}
generate_reports_task = PythonOperator(
task_id='generate_reports',
python_callable=generate_reports,
provide_context=True,
dag=dag,
)
# 12. مهمة التنظيف
cleanup_task = BashOperator(
task_id='cleanup_temporary_files',
bash_command='''
rm -f /tmp/api_data_{{ ds }}.json
rm -f /tmp/db_data_{{ ds }}.csv
rm -f /tmp/merged_data_{{ ds }}.csv
rm -f /tmp/aggregated_data_{{ ds }}.csv
rm -f /tmp/data_quality_{{ ds }}.json
''',
dag=dag,
)
# 13. مهمة إرسال الإشعارات
def send_notifications(**context):
"""إرسال إشعارات بنجاح العملية"""
execution_date = context['execution_date']
ti = context['ti']
# جمع نتائج المهام
extraction_result = ti.xcom_pull(task_ids='extract_data')
transformation_result = ti.xcom_pull(task_ids='transform_data')
quality_result = ti.xcom_pull(task_ids='data_quality_check')
report_result = ti.xcom_pull(task_ids='generate_reports')
# إنشاء رسالة الإشعار
message = f"""
✅ اكتمل خط أنابيب البيانات بنجاح
📅 التاريخ: {execution_date}
📊 النتائج:
- البيانات المستخرجة: {extraction_result['api_records']} من API، {extraction_result['db_records']} من قاعدة البيانات
- البيانات المحولة: {transformation_result['total_records']} سجل
- البيانات المجمعة: {transformation_result['aggregated_records']} سجل
- جودة البيانات: ✅ جميع الاختبارات ناجحة
📈 التقارير المولدة:
- تقرير المبيعات: {report_result['sales_report']}
- تقرير العملاء: {report_result['customer_report']}
- تقرير PDF: {report_result['pdf_report']}
⏱️ وقت التشغيل: {{ task_instance.duration }} ثانية
تم بنجاح! 🎉
"""
# إرسال الإشعار (يمكن استخدام Slack، Email، etc.)
print(message)
# هنا يمكن إضافة كود لإرسال الإشعار عبر Slack، Email، etc.
return message
notifications_task = PythonOperator(
task_id='send_notifications',
python_callable=send_notifications,
provide_context=True,
dag=dag,
)
# 14. تعريف تبعيات المهام
start_pipeline >> check_data_availability >> extract_task
extract_task >> transform_task >> quality_check
quality_check >> [load_to_warehouse, process_with_spark, send_to_kafka]
[load_to_warehouse, process_with_spark, send_to_kafka] >> generate_reports_task
generate_reports_task >> cleanup_task >> notifications_task >> end_pipeline
# 15. DAG إضافي للصيانة الأسبوعية
maintenance_dag = DAG(
'weekly_maintenance',
default_args=default_args,
description='صيانة أسبوعية لأنظمة البيانات',
schedule_interval='0 3 * * 0', # كل أحد في الساعة 3 صباحاً
catchup=False,
tags=['maintenance', 'cleanup'],
)
# مهام الصيانة الأسبوعية
optimize_tables = PostgresOperator(
task_id='optimize_database_tables',
postgres_conn_id='warehouse_db',
sql='''
ANALYZE VERBOSE daily_sales;
VACUUM ANALYZE daily_sales;
REINDEX TABLE daily_sales;
''',
dag=maintenance_dag,
)
cleanup_old_data = BashOperator(
task_id='cleanup_old_data',
bash_command='''
# حذف البيانات القديمة (أكثر من 90 يوم)
psql warehouse_db -c "
DELETE FROM daily_sales
WHERE processed_date < CURRENT_DATE - INTERVAL '90 days';
"
# حذف التقارير القديمة
find /reports -name "*.csv" -mtime +30 -delete
find /reports -name "*.pdf" -mtime +30 -delete
''',
dag=maintenance_dag,
)
backup_database = BashOperator(
task_id='backup_database',
bash_command='''
# نسخ احتياطي لقاعدة البيانات
pg_dump warehouse_db > /backups/warehouse_$(date +%Y%m%d).sql
gzip /backups/warehouse_$(date +%Y%m%d).sql
# حذف النسخ الاحتياطية القديمة
find /backups -name "*.sql.gz" -mtime +30 -delete
''',
dag=maintenance_dag,
)
# تبعيات مهام الصيانة
optimize_tables >> cleanup_old_data >> backup_database
# 16. اختبارات Airflow
def test_dag_integrity():
"""اختبار سلامة DAG"""
from airflow.models import DagBag
dag_bag = DagBag()
assert len(dag_bag.import_errors) == 0, f"أخطاء في تحميل DAGs: {dag_bag.import_errors}"
assert 'data_engineering_pipeline' in dag_bag.dags, "DAG غير موجود"
dag = dag_bag.dags['data_engineering_pipeline']
# التحقق من المهام
tasks = dag.tasks
assert len(tasks) == 12, f"عدد المهام المتوقع: 12، الفعلي: {len(tasks)}"
# التحقق من التبعيات
start_task = dag.get_task('start_pipeline')
assert len(start_task.downstream_list) == 1, "مهمة البداية يجب أن يكون لها تابع واحد"
print("✅ جميع اختبارات DAG ناجحة")
if __name__ == "__main__":
test_dag_integrity()
الخطوة 6: تعلم Kafka
Kafka للمعالجة البيانات في الوقت الفعلي (Real-Time Data Streaming)
الأهمية:
ضروري للمعالجة البيانات في الوقت الفعلي، مثل تحليلات الحدث
الأدوات:
Confluent Platform أو Kafka CLI
مثال عملي Apache Kafka:
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import KafkaError
import json
import time
from datetime import datetime
from typing import Dict, List, Optional
import threading
import logging
# 1. إعداد التسجيل (Logging)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 2. فئة لإدارة Kafka Producer
class DataStreamProducer:
"""منتج تدفق البيانات إلى Kafka"""
def __init__(self, bootstrap_servers: List[str]):
self.bootstrap_servers = bootstrap_servers
self.producer = None
self._connect()
def _connect(self):
"""الاتصال بـ Kafka"""
try:
self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: str(k).encode('utf-8') if k else None,
acks='all',
retries=3,
compression_type='gzip',
max_in_flight_requests_per_connection=1
)
logger.info("تم الاتصال بـ Kafka Producer بنجاح")
except Exception as e:
logger.error(f"فشل الاتصال بـ Kafka: {e}")
raise
def produce_message(self, topic: str, key: Optional[str], value: Dict):
"""إنتاج رسالة إلى Kafka"""
try:
future = self.producer.send(
topic=topic,
key=key,
value=value
)
# انتظار التأكيد
result = future.get(timeout=10)
logger.info(f"تم إرسال الرسالة إلى {topic} - Partition: {result.partition}, Offset: {result.offset}")
return True
except KafkaError as e:
logger.error(f"فشل إرسال الرسالة: {e}")
return False
def produce_batch(self, topic: str, messages: List[Dict]):
"""إنتاج دفعة من الرسائل"""
success_count = 0
fail_count = 0
for i, message in enumerate(messages):
key = message.get('id', str(i))
if self.produce_message(topic, key, message):
success_count += 1
else:
fail_count += 1
# إضافة تأخير صغير بين الرسائل
time.sleep(0.01)
logger.info(f"الدفعة: {success_count} ناجحة، {fail_count} فاشلة")
return success_count, fail_count
def close(self):
"""إغلاق المنتج"""
if self.producer:
self.producer.flush()
self.producer.close()
logger.info("تم إغلاق Kafka Producer")
# 3. فئة لإدارة Kafka Consumer
class DataStreamConsumer:
"""مستهلك تدفق البيانات من Kafka"""
def __init__(self, bootstrap_servers: List[str], group_id: str):
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.consumer = None
self.running = False
self._connect()
def _connect(self):
"""الاتصال بـ Kafka"""
try:
self.consumer = KafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=1000,
max_poll_records=100,
session_timeout_ms=30000,
heartbeat_interval_ms=10000
)
logger.info("تم الاتصال بـ Kafka Consumer بنجاح")
except Exception as e:
logger.error(f"فشل الاتصال بـ Kafka: {e}")
raise
def subscribe(self, topics: List[str]):
"""الاشتراك في مواضيع"""
self.consumer.subscribe(topics)
logger.info(f"تم الاشتراك في المواضيع: {topics}")
def consume_messages(self, callback):
"""استهلاك الرسائل"""
self.running = True
try:
while self.running:
# جلب الرسائل
batch = self.consumer.poll(timeout_ms=1000)
for topic_partition, messages in batch.items():
logger.info(f"جلب {len(messages)} رسالة من {topic_partition}")
for message in messages:
try:
# معالجة الرسالة
result = callback({
'topic': topic_partition.topic,
'partition': topic_partition.partition,
'offset': message.offset,
'key': message.key,
'value': message.value,
'timestamp': message.timestamp
})
if result is False:
logger.warning("توقف المعالجة بناءً على رد الدالة")
return
except Exception as e:
logger.error(f"خطأ في معالجة الرسالة: {e}")
# التزام بالإزاحة
self.consumer.commit()
except KeyboardInterrupt:
logger.info("توقف المستهلك بواسطة المستخدم")
except Exception as e:
logger.error(f"خطأ في استهلاك الرسائل: {e}")
finally:
self.close()
def consume_single_message(self, timeout_ms: int = 10000):
"""استهلاك رسالة واحدة"""
try:
messages = self.consumer.poll(timeout_ms=timeout_ms)
for topic_partition, msgs in messages.items():
if msgs:
message = msgs[0]
return {
'topic': topic_partition.topic,
'partition': topic_partition.partition,
'offset': message.offset,
'key': message.key,
'value': message.value,
'timestamp': message.timestamp
}
except Exception as e:
logger.error(f"خطأ في استهلاك الرسالة: {e}")
return None
def close(self):
"""إغلاق المستهلك"""
if self.consumer:
self.consumer.close()
logger.info("تم إغلاق Kafka Consumer")
# 4. فئة لإدارة Kafka Topics
class KafkaTopicManager:
"""مدير مواضيع Kafka"""
def __init__(self, bootstrap_servers: List[str]):
self.bootstrap_servers = bootstrap_servers
self.admin_client = None
self._connect()
def _connect(self):
"""الاتصال بـ Kafka Admin"""
try:
self.admin_client = KafkaAdminClient(
bootstrap_servers=self.bootstrap_servers,
client_id='kafka_topic_manager'
)
logger.info("تم الاتصال بـ Kafka Admin Client بنجاح")
except Exception as e:
logger.error(f"فشل الاتصال بـ Kafka Admin: {e}")
raise
def create_topic(self, topic_name: str, num_partitions: int = 3, replication_factor: int = 1):
"""إنشاء موضوع جديد"""
try:
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor,
topic_configs={
'retention.ms': '604800000', # 7 أيام
'cleanup.policy': 'delete',
'compression.type': 'gzip'
}
)
self.admin_client.create_topics([topic])
logger.info(f"تم إنشاء الموضوع: {topic_name}")
return True
except Exception as e:
logger.error(f"فشل إنشاء الموضوع {topic_name}: {e}")
return False
def delete_topic(self, topic_name: str):
"""حذف موضوع"""
try:
self.admin_client.delete_topics([topic_name])
logger.info(f"تم حذف الموضوع: {topic_name}")
return True
except Exception as e:
logger.error(f"فشل حذف الموضوع {topic_name}: {e}")
return False
def list_topics(self):
"""عرض جميع المواضيع"""
try:
topics = self.admin_client.list_topics()
logger.info(f"المواضيع المتاحة: {topics}")
return topics
except Exception as e:
logger.error(f"فشل عرض المواضيع: {e}")
return []
def describe_topic(self, topic_name: str):
"""وصف موضوع"""
try:
configs = self.admin_client.describe_configs(
config_resources=[ConfigResource(ConfigResourceType.TOPIC, topic_name)]
)
for config_resource in configs:
logger.info(f"إعدادات الموضوع {topic_name}:")
for key, value in config_resource.items():
logger.info(f" {key}: {value.value}")
return configs
except Exception as e:
logger.error(f"فشل وصف الموضوع {topic_name}: {e}")
return None
def close(self):
"""إغلاق اتصال Admin"""
if self.admin_client:
self.admin_client.close()
logger.info("تم إغلاق Kafka Admin Client")
# 5. فئة لمعالجة تدفق البيانات في الوقت الفعلي
class RealTimeDataProcessor:
"""معالج بيانات في الوقت الفعلي باستخدام Kafka"""
def __init__(self, bootstrap_servers: List[str], source_topic: str, sink_topic: str):
self.bootstrap_servers = bootstrap_servers
self.source_topic = source_topic
self.sink_topic = sink_topic
self.producer = DataStreamProducer(bootstrap_servers)
self.consumer = DataStreamConsumer(bootstrap_servers, 'realtime_processor_group')
self.topic_manager = KafkaTopicManager(bootstrap_servers)
def setup_topics(self):
"""إعداد المواضيع"""
# إنشاء مواضيع إذا لم تكن موجودة
topics = self.topic_manager.list_topics()
if self.source_topic not in topics:
self.topic_manager.create_topic(self.source_topic)
if self.sink_topic not in topics:
self.topic_manager.create_topic(self.sink_topic)
def process_message(self, message: Dict) -> Optional[Dict]:
"""معالجة رسالة فردية"""
try:
raw_data = message['value']
# تحويل البيانات
processed_data = {
'id': raw_data.get('id', message['key']),
'timestamp': datetime.now().isoformat(),
'source_topic': message['topic'],
'source_partition': message['partition'],
'source_offset': message['offset'],
'customer_id': raw_data.get('customer_id'),
'event_type': raw_data.get('event_type'),
'amount': raw_data.get('amount', 0),
'location': raw_data.get('location', {}),
'metadata': {
'processing_time': datetime.now().isoformat(),
'processor_version': '1.0.0'
}
}
# تحويلات إضافية
if 'amount' in raw_data:
processed_data['amount_usd'] = raw_data['amount'] * 3.75 # تحويل إلى دولار
processed_data['amount_category'] = self._categorize_amount(raw_data['amount'])
if 'timestamp' in raw_data:
processed_data['hour_of_day'] = datetime.fromisoformat(raw_data['timestamp']).hour
processed_data['day_of_week'] = datetime.fromisoformat(raw_data['timestamp']).strftime('%A')
# فحص الجودة
if self._validate_data(processed_data):
return processed_data
else:
logger.warning(f"بيانات غير صالحة: {processed_data['id']}")
return None
except Exception as e:
logger.error(f"خطأ في معالجة الرسالة: {e}")
return None
def _categorize_amount(self, amount: float) -> str:
"""تصنيف المبلغ"""
if amount < 100:
return 'صغير'
elif amount < 1000:
return 'متوسط'
else:
return 'كبير'
def _validate_data(self, data: Dict) -> bool:
"""التحقق من صحة البيانات"""
required_fields = ['id', 'customer_id', 'event_type']
for field in required_fields:
if not data.get(field):
return False
if data.get('amount') and data['amount'] < 0:
return False
return True
def start_processing(self):
"""بدء المعالجة"""
logger.info("بدء معالجة البيانات في الوقت الفعلي")
# إعداد المواضيع
self.setup_topics()
# الاشتراك في الموضوع المصدر
self.consumer.subscribe([self.source_topic])
# دالة رد الاتصال للمعالجة
def process_callback(message):
# معالجة الرسالة
processed_data = self.process_message(message)
if processed_data:
# إرسال البيانات المعالجة إلى الموضوع الهدف
success = self.producer.produce_message(
topic=self.sink_topic,
key=processed_data['id'],
value=processed_data
)
if success:
logger.info(f"تم معالجة وإرسال: {processed_data['id']}")
else:
logger.error(f"فشل إرسال: {processed_data['id']}")
return True # مواصلة المعالجة
# بدء استهلاك البيانات
self.consumer.consume_messages(process_callback)
def stop_processing(self):
"""إيقاف المعالجة"""
logger.info("إيقاف معالجة البيانات")
self.consumer.running = False
def close(self):
"""إغلاق جميع الاتصالات"""
self.producer.close()
self.consumer.close()
self.topic_manager.close()
logger.info("تم إغلاق جميع اتصالات Kafka")
# 6. مثال استخدام
if __name__ == "__main__":
# تكوين Kafka
BOOTSTRAP_SERVERS = ['localhost:9092']
SOURCE_TOPIC = 'raw_events'
SINK_TOPIC = 'processed_events'
# إنشاء معالج البيانات
processor = RealTimeDataProcessor(BOOTSTRAP_SERVERS, SOURCE_TOPIC, SINK_TOPIC)
try:
# بدء المعالجة في thread منفصل
processing_thread = threading.Thread(target=processor.start_processing)
processing_thread.start()
# إنتاج بعض البيانات التجريبية
producer = DataStreamProducer(BOOTSTRAP_SERVERS)
# بيانات تجريبية
test_messages = [
{
'id': 'event_001',
'customer_id': 'cust_123',
'event_type': 'purchase',
'amount': 150.50,
'timestamp': datetime.now().isoformat(),
'location': {'city': 'الرياض', 'country': 'السعودية'}
},
{
'id': 'event_002',
'customer_id': 'cust_456',
'event_type': 'view',
'amount': 0,
'timestamp': datetime.now().isoformat(),
'location': {'city': 'جدة', 'country': 'السعودية'}
},
{
'id': 'event_003',
'customer_id': 'cust_789',
'event_type': 'purchase',
'amount': 2500.00,
'timestamp': datetime.now().isoformat(),
'location': {'city': 'الدمام', 'country': 'السعودية'}
}
]
# إرسال البيانات
for message in test_messages:
producer.produce_message(SOURCE_TOPIC, message['id'], message)
time.sleep(1)
# انتظار قليل لمعالجة البيانات
time.sleep(5)
# استهلاك البيانات المعالجة
consumer = DataStreamConsumer(BOOTSTRAP_SERVERS, 'test_consumer_group')
consumer.subscribe([SINK_TOPIC])
print("📥 البيانات المعالجة:")
for _ in range(3):
processed_message = consumer.consume_single_message(5000)
if processed_message:
print(json.dumps(processed_message['value'], indent=2, ensure_ascii=False))
consumer.close()
producer.close()
except KeyboardInterrupt:
print("\nإيقاف المعالجة...")
finally:
processor.stop_processing()
processor.close()
# 7. التكامل مع Spark Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
def create_spark_kafka_stream():
"""إنشاء تدفق Spark من Kafka"""
spark = SparkSession.builder \
.appName("KafkaSparkIntegration") \
.config("spark.sql.shuffle.partitions", "100") \
.getOrCreate()
# قراءة من Kafka
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "processed_events") \
.option("startingOffsets", "latest") \
.load()
# تعريف مخطط البيانات
schema = StructType([
StructField("id", StringType()),
StructField("customer_id", StringType()),
StructField("event_type", StringType()),
StructField("amount", DoubleType()),
StructField("amount_usd", DoubleType()),
StructField("amount_category", StringType()),
StructField("timestamp", TimestampType()),
StructField("hour_of_day", IntegerType()),
StructField("day_of_week", StringType()),
StructField("location", StructType([
StructField("city", StringType()),
StructField("country", StringType())
])),
StructField("metadata", StructType([
StructField("processing_time", TimestampType()),
StructField("processor_version", StringType())
]))
])
# تحويل البيانات
parsed_df = kafka_df \
.select(from_json(col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# معالجة التدفق
processed_stream = parsed_df \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "1 hour"),
"event_type",
"amount_category"
).agg(
count("*").alias("event_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
approx_count_distinct("customer_id").alias("unique_customers")
)
return processed_stream
# 8. مراقبة Kafka
class KafkaMonitor:
"""مراقب حالة Kafka"""
def __init__(self, bootstrap_servers: List[str]):
self.bootstrap_servers = bootstrap_servers
self.admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
def get_cluster_health(self):
"""الحصول على حالة الكتلة"""
try:
# الحصول على معلومات الكتلة
cluster_description = self.admin_client.describe_cluster()
health_info = {
'cluster_id': cluster_description.cluster_id,
'controller_id': cluster_description.controller.id,
'broker_count': len(cluster_description.brokers),
'brokers': [
{
'id': broker.id,
'host': broker.host,
'port': broker.port,
'rack': broker.rack
}
for broker in cluster_description.brokers
]
}
return health_info
except Exception as e:
logger.error(f"خطأ في الحصول على حالة الكتلة: {e}")
return None
def get_topic_metrics(self, topic_name: str):
"""الحصول على مقاييس الموضوع"""
try:
# الحصول على أقسام الموضوع
partitions = self.admin_client.list_topics().get(topic_name, [])
metrics = {
'topic': topic_name,
'partition_count': len(partitions),
'partitions': []
}
# هنا يمكن إضافة كود للحصول على مقاييس الأقسام
# مثل حجم البيانات، الإزاحات، etc.
return metrics
except Exception as e:
logger.error(f"خطأ في الحصول على مقاييس الموضوع: {e}")
return None
def close(self):
"""إغلاق الاتصال"""
self.admin_client.close()
# 9. اختبارات Kafka
import unittest
class TestKafkaIntegration(unittest.TestCase):
"""اختبار تكامل Kafka"""
def setUp(self):
self.bootstrap_servers = ['localhost:9092']
self.test_topic = 'test_topic'
def test_producer_connection(self):
"""اختبار اتصال المنتج"""
producer = DataStreamProducer(self.bootstrap_servers)
self.assertIsNotNone(producer.producer)
producer.close()
def test_consumer_connection(self):
"""اختبار اتصال المستهلك"""
consumer = DataStreamConsumer(self.bootstrap_servers, 'test_group')
self.assertIsNotNone(consumer.consumer)
consumer.close()
def test_topic_creation(self):
"""اختبار إنشاء الموضوع"""
topic_manager = KafkaTopicManager(self.bootstrap_servers)
success = topic_manager.create_topic(self.test_topic)
self.assertTrue(success)
# التحقق من وجود الموضوع
topics = topic_manager.list_topics()
self.assertIn(self.test_topic, topics)
topic_manager.close()
if __name__ == '__main__':
unittest.main()
هندسة أنظمة البيانات
استخراج البيانات
APIs، قواعد بيانات، ملفات
معالجة البيانات
ETL/ELT، التنظيف، التحويل
تخزين البيانات
Data Warehouses، Data Lakes
التحليل
BI Tools، التحليلات، التقارير
أدوات هندسة البيانات
قواعد البيانات
PostgreSQL، MySQL، MongoDB، Cassandra
معالجة البيانات
Apache Spark، Apache Flink، Apache Beam
إدارة سير العمل
Apache Airflow، Luigi، Prefect
المزايا والتحديات
المزايا
- طلب عالي: هناك طلب كبير على مهندسي البيانات، خاصة في الشركات التي تعتمد على البيانات الضخمة
- أدوات مجانية: معظم الأدوات المستخدمة مثل Apache Spark و Hadoop مجانية ومفتوحة المصدر
- مجتمع كبير: Python و Apache Spark لديهما مجتمعات نشطة توفر الدعم والموارد
- إبداع لا محدود: يمكنك بناء أنظمة معقدة لمعالجة البيانات وتحليلها
- رواتب ممتازة: مهندسو البيانات من أعلى المهن دخلاً في مجال التكنولوجيا
التحديات
- منحنى التعلم الحاد: يتطلب فهماً جيداً لـ Python والرياضيات والإحصاء
- حجم البيانات: قد تواجه تحديات في التعامل مع مجموعات بيانات كبيرة جداً
- تحديثات متكررة: الأدوات والتقنيات تتطور باستمرار، مما يتطلب تحديث المعرفة بشكل منتظم
- تعقيد الأنظمة: أنظمة هندسة البيانات معقدة وتتطلب فهماً عميقاً للهندسة
تخصصات في هندسة البيانات
هندسة ETL
تصميم وتنفيذ خطوط أنابيب استخراج وتحويل وتحميل
هندسة التدفقات
معالجة البيانات في الوقت الفعلي والتدفقات
هندسة المستودعات
تصميم وتنفيذ مستودعات البيانات
الخلاصة
مهندس البيانات مجال متقدم ومطلوب بشدة في سوق العمل. من خلال إتقان Python، SQL، Apache Spark، Hadoop، Airflow، و Kafka، يمكنك بناء أنظمة متكاملة لجمع ومعالجة وتحليل البيانات بكفاءة عالية.
نصائح للبدء:
- ابدأ بإتقان Python و SQL كأساس
- تعلم Apache Spark لمعالجة البيانات الضخمة
- اتقن إدارة قواعد البيانات SQL و NoSQL
- تعلم Airflow لأتمتة سير العمل
- جرب Kafka لمعالجة البيانات في الوقت الفعلي