متخصص البيانات الضخمة
ما هو متخصص البيانات الضخمة؟
متخصص البيانات الضخمة
متخصص البيانات الضخمة يركز على جمع ومعالجة وتحليل كميات هائلة من البيانات باستخدام تقنيات مثل Spark، Hadoop، وقواعد بيانات NoSQL، لاستخراج رؤى قيمة لاتخاذ القرارات.
معالجة البيانات
معالجة كميات هائلة من البيانات بكفاءة
تحليل البيانات
استخراج رؤى قيمة من البيانات
تخزين البيانات
تخزين البيانات الضخمة بأنظمة موزعة
الحوسبة السحابية
استخدام السحابة لمعالجة البيانات
اللغات والأدوات المستخدمة
SQL
لإدارة قواعد البيانات العلائقية باستخدام Room Database
Python/Scala
لكتابة البرامج النصية في Apache Spark و DevOps
Shell Scripting
لأتمتة المهام في أنظمة Linux
JSON/YAML
لتكوين ملفات الإعدادات مثل NoSQL أو Hive
Java
لتطوير تطبيقات Hadoop و MapReduce
R Programming
للتحليل الإحصائي والتخيل البياني للبيانات
مهارات متخصص البيانات الضخمة
Hadoop Ecosystem
إتقان نظام Hadoop وتطبيقاته
Apache Spark
معالجة البيانات الضخمة بسرعة
Hive & SQL
استعلامات البيانات الضخمة
NoSQL Databases
قواعد البيانات غير العلائقية
Data Warehousing
مستودعات البيانات والتحليل
Cloud Computing
معالجة البيانات في السحابة
خارطة التعلم خطوة بخطوة
الخطوة 1: تعلم Hadoop
Hadoop هو إطار عمل لتخزين ومعالجة البيانات الضخمة باستخدام MapReduce و HDFS (Hadoop Distributed File System)
الأهمية:
الأساس لتخزين البيانات الضخمة عبر أنظمة موزعة متعددة
الأدوات:
Hortonworks Sandbox، Hadoop CLI
مثال عملي: تخزين بيانات باستخدام HDFS
# تهيئة HDFS وتشغيل الخدمات
# تشغيل Hadoop في وضع محلي
hdfs namenode -format
start-dfs.sh
start-yarn.sh
# أوامر HDFS الأساسية
# 1. إنشاء مجلد في HDFS
hdfs dfs -mkdir /data
hdfs dfs -mkdir /data/input
hdfs dfs -mkdir /data/output
# 2. رفع ملف محلي إلى HDFS
hdfs dfs -put local_data.csv /data/input/
hdfs dfs -put logs.txt /data/input/
hdfs dfs -copyFromLocal users.csv /data/input/
# 3. عرض محتويات مجلد في HDFS
hdfs dfs -ls /data/input
hdfs dfs -ls -h /data/input # مع حجم الملفات
hdfs dfs -ls -R /data # عرض متكرر
# 4. عرض محتوى ملف في HDFS
hdfs dfs -cat /data/input/local_data.csv
hdfs dfs -head /data/input/local_data.csv # أول 10 أسطر
hdfs dfs -tail /data/input/local_data.csv # آخر 10 أسطر
# 5. نسخ ملف من HDFS إلى النظام المحلي
hdfs dfs -get /data/input/local_data.csv ./local_copy.csv
hdfs dfs -copyToLocal /data/input/logs.txt ./
# 6. حذف ملف أو مجلد من HDFS
hdfs dfs -rm /data/input/temp_file.txt
hdfs dfs -rm -r /data/old_output # حذف مجلد بشكل متكرر
# 7. عرض مساحة التخزين المستخدمة
hdfs dfs -du -h /data # حجم المجلدات
hdfs dfs -df -h # المساحة الإجمالية
# 8. تغيير صلاحيات الملفات
hdfs dfs -chmod 755 /data/input/local_data.csv
hdfs dfs -chown hadoop:hadoop /data/input/
# 9. نقل أو إعادة تسمية الملفات
hdfs dfs -mv /data/input/old_name.csv /data/input/new_name.csv
hdfs dfs -mv /data/input/temp_file.txt /data/processed/
# 10. فحص حالة HDFS
hdfs dfsadmin -report
hdfs fsck / -files -blocks -locations
# برنامج MapReduce بسيط في Java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper
الخطوة 2: تعلم Hive
Hive هي أداة لاستعلامات البيانات باستخدام SQL-like Syntax على Hadoop. تستخدم لتحليل البيانات المخزنة في HDFS
الأهمية:
تجعل استعلامات البيانات الضخمة أسهل باستخدام بناء جملة مشابه لـ SQL
الأدوات:
Hive CLI أو Beeline
مثال عملي: استعلام بيانات باستخدام Hive
-- تشغيل Hive CLI
hive
-- أو استخدام Beeline (مستحب)
beeline -u jdbc:hive2://localhost:10000
-- 1. إنشاء قاعدة بيانات
CREATE DATABASE IF NOT EXISTS sales_db;
USE sales_db;
-- 2. إنشاء جدول داخلي (Managed Table)
CREATE TABLE IF NOT EXISTS customers (
customer_id INT,
name STRING,
email STRING,
age INT,
city STRING,
registration_date DATE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
-- 3. إنشاء جدول خارجي (External Table)
CREATE EXTERNAL TABLE IF NOT EXISTS products (
product_id INT,
product_name STRING,
category STRING,
price DECIMAL(10,2),
stock_quantity INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/data/products';
-- 4. تحميل البيانات من HDFS إلى الجدول
LOAD DATA INPATH '/data/input/customers.csv' INTO TABLE customers;
LOAD DATA INPATH '/data/input/products.txt' INTO TABLE products;
-- 5. استعلامات أساسية
-- عرض أول 10 سجلات
SELECT * FROM customers LIMIT 10;
-- عد السجلات
SELECT COUNT(*) AS total_customers FROM customers;
-- العملاء فوق 30 سنة
SELECT customer_id, name, age, city
FROM customers
WHERE age > 30
ORDER BY age DESC;
-- 6. استعلامات متقدمة
-- تجميع حسب المدينة
SELECT
city,
COUNT(*) AS customer_count,
AVG(age) AS average_age,
MIN(age) AS min_age,
MAX(age) AS max_age
FROM customers
GROUP BY city
HAVING COUNT(*) > 5
ORDER BY customer_count DESC;
-- JOIN بين الجداول
CREATE TABLE orders (
order_id INT,
customer_id INT,
product_id INT,
quantity INT,
order_date DATE,
total_amount DECIMAL(10,2)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
LOAD DATA INPATH '/data/input/orders.csv' INTO TABLE orders;
-- استعلام مع JOIN متعدد
SELECT
c.customer_id,
c.name,
c.city,
o.order_id,
o.order_date,
p.product_name,
o.quantity,
o.total_amount
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE o.order_date >= '2024-01-01'
ORDER BY o.total_amount DESC
LIMIT 20;
-- 7. وظائف النافذة (Window Functions)
-- ترتيب العملاء حسب الإنفاق في كل مدينة
SELECT
city,
customer_id,
name,
total_spent,
RANK() OVER (PARTITION BY city ORDER BY total_spent DESC) AS city_rank
FROM (
SELECT
c.city,
c.customer_id,
c.name,
SUM(o.total_amount) AS total_spent
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.city, c.customer_id, c.name
) customer_totals;
-- 8. إنشاء Views
CREATE VIEW top_customers AS
SELECT
c.customer_id,
c.name,
c.city,
COUNT(o.order_id) AS order_count,
SUM(o.total_amount) AS total_spent,
AVG(o.total_amount) AS avg_order_value
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.city
HAVING SUM(o.total_amount) > 1000;
-- استعلام من الـ View
SELECT * FROM top_customers ORDER BY total_spent DESC;
-- 9. التحسينات والأداء
-- إضافة Partitions
CREATE TABLE sales_partitioned (
sale_id INT,
product_id INT,
amount DECIMAL(10,2),
sale_date DATE
)
PARTITIONED BY (year INT, month INT)
STORED AS ORC;
-- إضافة Partition يدوياً
ALTER TABLE sales_partitioned ADD PARTITION (year=2024, month=1);
ALTER TABLE sales_partitioned ADD PARTITION (year=2024, month=2);
-- تحميل البيانات إلى Partition
LOAD DATA INPATH '/data/sales/2024/01/sales.csv'
INTO TABLE sales_partitioned
PARTITION (year=2024, month=1);
-- استعلام مع Partition Pruning
SELECT * FROM sales_partitioned
WHERE year = 2024 AND month = 1;
-- 10. إدارة الجداول
-- عرض جميع الجداول
SHOW TABLES;
-- عرض هيكل الجدول
DESCRIBE customers;
DESCRIBE FORMATTED customers;
-- تحديث إحصائيات الجدول
ANALYZE TABLE customers COMPUTE STATISTICS;
ANALYZE TABLE customers COMPUTE STATISTICS FOR COLUMNS;
-- تغيير هيكل الجدول
ALTER TABLE customers ADD COLUMNS (phone_number STRING);
ALTER TABLE customers CHANGE COLUMN age customer_age INT;
-- إعادة تسمية الجدول
ALTER TABLE customers RENAME TO clients;
-- حذف الجدول
DROP TABLE IF EXISTS temp_table;
-- 11. التعامل مع أنواع البيانات المتقدمة
CREATE TABLE complex_data (
id INT,
name STRING,
addresses ARRAY,
phone_numbers MAP,
profile STRUCT
)
ROW FORMAT DELIMITED
COLLECTION ITEMS TERMINATED BY ';'
MAP KEYS TERMINATED BY ':'
STORED AS TEXTFILE;
-- استعلام البيانات المعقدة
SELECT
id,
name,
addresses[0] AS primary_address,
phone_numbers['mobile'] AS mobile_phone,
profile.birth_date
FROM complex_data;
-- 12. تصدير البيانات
INSERT OVERWRITE DIRECTORY '/data/output/top_customers'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
SELECT * FROM top_customers;
-- أو حفظ كجدول جديد
CREATE TABLE top_customers_export AS
SELECT * FROM top_customers;
الخطوة 3: تعلم Spark
Apache Spark هو إطار عمل قوي لمعالجة كميات ضخمة من البيانات بسرعة وكفاءة، يدعم معالجة البيانات الموزعة باستخدام RDDs (Resilient Distributed Datasets)
الأهمية:
ضروري لمعالجة البيانات الضخمة بشكل سريع وفعال
الأدوات:
Java أو Scala (لـ Spark) و Python (واجهة PySpark)
مثال عملي: معالجة بيانات باستخدام PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F
# 1. إنشاء جلسة Spark
spark = SparkSession.builder \
.appName("BigData Processing") \
.config("spark.sql.shuffle.partitions", "100") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
# 2. تحميل البيانات من مصادر مختلفة
# من ملف CSV
customers_df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("delimiter", ",") \
.csv("/data/input/customers.csv")
# من ملف JSON
products_df = spark.read \
.option("multiLine", "true") \
.json("/data/input/products.json")
# من Hive Table
orders_df = spark.sql("SELECT * FROM sales_db.orders")
# من قاعدة بيانات JDBC
jdbc_url = "jdbc:mysql://localhost:3306/sales_db"
properties = {
"user": "username",
"password": "password",
"driver": "com.mysql.jdbc.Driver"
}
employees_df = spark.read \
.jdbc(url=jdbc_url, table="employees", properties=properties)
# 3. عرض هيكل البيانات
print("=== هيكل جدول العملاء ===")
customers_df.printSchema()
print("=== عينة من البيانات ===")
customers_df.show(5, truncate=False)
print("=== إحصائيات أساسية ===")
customers_df.describe().show()
# 4. التحويلات الأساسية
# تصفية البيانات
young_customers = customers_df.filter(col("age") < 30)
premium_customers = customers_df.filter((col("age") > 25) & (col("city") == "Riyadh"))
# إضافة أعمدة جديدة
customers_with_status = customers_df.withColumn(
"customer_status",
when(col("age") < 25, "Young")
.when(col("age").between(25, 40), "Adult")
.otherwise("Senior")
)
# إعادة تسمية الأعمدة
customers_renamed = customers_df \
.withColumnRenamed("customer_id", "id") \
.withColumnRenamed("registration_date", "reg_date")
# 5. التجميعات والإحصائيات
# تجميع حسب المدينة
city_stats = customers_df.groupBy("city") \
.agg(
count("*").alias("customer_count"),
avg("age").alias("avg_age"),
min("age").alias("min_age"),
max("age").alias("max_age"),
sum(when(col("age") > 30, 1).otherwise(0)).alias("over_30_count")
) \
.orderBy(desc("customer_count"))
city_stats.show()
# 6. JOIN العمليات
# تحميل بيانات الطلبات
orders_df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("/data/input/orders.csv")
# Inner Join
customer_orders = customers_df.join(
orders_df,
customers_df.customer_id == orders_df.customer_id,
"inner"
)
# Left Join مع تجميع
customer_summary = customers_df.join(
orders_df,
customers_df.customer_id == orders_df.customer_id,
"left"
).groupBy(
customers_df.customer_id,
customers_df.name,
customers_df.city
).agg(
count(orders_df.order_id).alias("order_count"),
sum(orders_df.total_amount).alias("total_spent"),
avg(orders_df.total_amount).alias("avg_order_value"),
max(orders_df.order_date).alias("last_order_date")
).filter(col("total_spent").isNotNull())
# 7. وظائف النافذة (Window Functions)
# تعريف نافذة
window_spec = Window.partitionBy("city").orderBy(desc("total_spent"))
customer_rankings = customer_summary.withColumn(
"city_rank",
rank().over(window_spec)
).withColumn(
"spent_percentile",
percent_rank().over(window_spec)
).withColumn(
"cumulative_spent",
sum("total_spent").over(window_spec.rowsBetween(Window.unboundedPreceding, 0))
)
# 8. التعامل مع البيانات النصية
# تحليل النصوص
text_analysis = customers_df.withColumn(
"name_length",
length(col("name"))
).withColumn(
"email_domain",
split(col("email"), "@").getItem(1)
).withColumn(
"name_upper",
upper(col("name"))
).withColumn(
"name_initials",
concat(
substring(col("name"), 0, 1),
lit("."),
split(col("name"), " ").getItem(1).substr(0, 1)
)
)
# 9. التعامل مع التواريخ
date_analysis = customers_df.withColumn(
"registration_year",
year(col("registration_date"))
).withColumn(
"registration_month",
month(col("registration_date"))
).withColumn(
"registration_quarter",
quarter(col("registration_date"))
).withColumn(
"days_since_registration",
datediff(current_date(), col("registration_date"))
).withColumn(
"registration_year_month",
date_format(col("registration_date"), "yyyy-MM")
)
# 10. تحويل البيانات المعقدة
# من JSON إلى Struct
json_schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("address", StructType([
StructField("street", StringType()),
StructField("city", StringType()),
StructField("zipcode", StringType())
])),
StructField("phones", ArrayType(StringType()))
])
complex_data = spark.read.schema(json_schema).json("/data/complex/customers.json")
# فك الهياكل المعقدة
flattened_data = complex_data.select(
"id",
"name",
col("address.street").alias("street"),
col("address.city").alias("address_city"),
col("address.zipcode").alias("zipcode"),
explode("phones").alias("phone")
)
# 11. التحسينات والأداء
# التخزين المؤقت
customer_summary.cache()
customer_summary.count() # لتفعيل التخزين المؤقت
# إعادة التقسيم
optimized_df = customer_summary.repartition(10, "city")
# الكتابة بتنسيق Parquet (مضغوط)
customer_summary.write \
.mode("overwrite") \
.partitionBy("city") \
.parquet("/data/output/customer_summary_parquet")
# 12. Machine Learning مع Spark MLlib
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
# تحضير البيانات للتعلم الآلي
numeric_cols = ["age", "order_count", "total_spent"]
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
# تجميع العملاء
kmeans = KMeans(k=4, seed=42, featuresCol="features")
pipeline = Pipeline(stages=[assembler, kmeans])
model = pipeline.fit(customer_summary)
# التنبؤات
clustered_customers = model.transform(customer_summary)
clustered_customers.select("customer_id", "name", "prediction").show()
# 13. الكتابة إلى مصادر مختلفة
# إلى HDFS بتنسيق CSV
customer_summary.write \
.option("header", "true") \
.mode("overwrite") \
.csv("/data/output/customer_summary")
# إلى Hive Table
customer_summary.write \
.mode("overwrite") \
.saveAsTable("sales_db.customer_summary_spark")
# إلى قاعدة بيانات
customer_summary.write \
.mode("overwrite") \
.jdbc(url=jdbc_url, table="customer_summary", properties=properties)
# 14. Stream Processing
from pyspark.sql.streaming import StreamingQuery
# قراءة من Kafka
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sales_topic") \
.load()
# معالجة البيانات المتدفقة
stream_processed = stream_df.select(
from_json(col("value").cast("string"), sales_schema).alias("data")
).select("data.*") \
.groupBy("product_id", window("timestamp", "1 hour")) \
.agg(sum("amount").alias("hourly_sales"))
# الكتابة إلى console
query = stream_processed.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
# 15. إيقاف جلسة Spark
spark.stop()
الخطوة 4: تعلم قواعد بيانات NoSQL
NoSQL Databases مثل MongoDB، Cassandra، HBase تستخدم للتعامل مع بيانات غير منظمة أو شبه منظمة
الأهمية:
ضرورية للتعامل مع بيانات غير منظمة مثل JSON أو XML
الأدوات:
MongoDB Compass، Cassandra CLI
مثال عملي: التخزين واسترجاع بيانات باستخدام MongoDB
// 1. تشغيل MongoDB وتوصيل الـ Shell
mongosh
// أو استخدام MongoDB Compass للواجهة الرسومية
// 2. إنشاء واستخدام قاعدة بيانات
use ecommerce_db;
// 3. إنشاء مجموعة (Collection) وإدراج بيانات
// إدراج مستند واحد
db.customers.insertOne({
"customer_id": 1001,
"name": "أحمد محمد",
"email": "ahmed@example.com",
"age": 28,
"address": {
"street": "شارع الملك فهد",
"city": "الرياض",
"postal_code": "11564",
"country": "السعودية"
},
"phone_numbers": ["+966501234567", "+966502345678"],
"registration_date": new Date("2024-01-15"),
"preferences": {
"language": "ar",
"currency": "SAR",
"notifications": {
"email": true,
"sms": false,
"push": true
}
},
"tags": ["vip", "frequent_buyer", "tech_savvy"]
});
// إدراج مستندات متعددة
db.customers.insertMany([
{
"customer_id": 1002,
"name": "سارة العتيبي",
"email": "sara@example.com",
"age": 32,
"address": {
"street": "حي النخيل",
"city": "جدة",
"postal_code": "21442",
"country": "السعودية"
},
"registration_date": new Date("2023-11-20"),
"total_spent": 12500.50,
"orders_count": 15
},
{
"customer_id": 1003,
"name": "خالد السليم",
"email": "khaled@example.com",
"age": 45,
"address": {
"street": "حي العليا",
"city": "الرياض",
"country": "السعودية"
},
"registration_date": new Date("2024-02-10"),
"total_spent": 8500.75,
"orders_count": 8
}
]);
// 4. استعلامات أساسية
// استرجاع جميع المستندات
db.customers.find();
// استرجاع مع تنسيق جميل
db.customers.find().pretty();
// استرجاع حقول محددة
db.customers.find(
{},
{ "name": 1, "email": 1, "city": "$address.city" }
).pretty();
// 5. استعلامات متقدمة
// استعلام مع شروط
db.customers.find({
"age": { "$gt": 30 }
}).pretty();
// استعلام بشروط متعددة
db.customers.find({
"age": { "$gte": 25, "$lte": 40 },
"address.city": "الرياض"
}).pretty();
// استعلام مع عوامل منطقية
db.customers.find({
"$or": [
{ "age": { "$lt": 25 } },
{ "address.city": "جدة" }
]
}).pretty();
// 6. استعلامات في البيانات المتداخلة
// استعلام في كائن متداخل
db.customers.find({
"address.city": "الرياض",
"preferences.language": "ar"
}).pretty();
// استعلام في مصفوفة
db.customers.find({
"tags": "vip"
}).pretty();
// 7. التجميع (Aggregation)
// تجميع حسب المدينة
db.customers.aggregate([
{
"$group": {
"_id": "$address.city",
"total_customers": { "$sum": 1 },
"average_age": { "$avg": "$age" },
"total_revenue": { "$sum": "$total_spent" },
"min_age": { "$min": "$age" },
"max_age": { "$max": "$age" }
}
},
{
"$sort": { "total_revenue": -1 }
}
]);
// تجميع مع unwind للصفائف
db.customers.aggregate([
{ "$unwind": "$tags" },
{
"$group": {
"_id": "$tags",
"count": { "$sum": 1 },
"customers": { "$push": "$name" }
}
}
]);
// 8. تحديث البيانات
// تحديث حقل واحد
db.customers.updateOne(
{ "customer_id": 1001 },
{ "$set": { "age": 29 } }
);
// تحديث حقول متعددة
db.customers.updateOne(
{ "customer_id": 1001 },
{
"$set": {
"age": 30,
"address.city": "الخبر",
"last_updated": new Date()
}
}
);
// تحديث بإضافة إلى مصفوفة
db.customers.updateOne(
{ "customer_id": 1001 },
{ "$push": { "tags": "early_adopter" } }
);
// تحديث جميع المستندات المطابقة
db.customers.updateMany(
{ "address.city": "الرياض" },
{ "$set": { "region": "المنطقة الوسطى" } }
);
// 9. حذف البيانات
// حذف مستند واحد
db.customers.deleteOne({ "customer_id": 1003 });
// حذف مستندات متعددة
db.customers.deleteMany({ "age": { "$lt": 25 } });
// 10. الفهرسة (Indexing)
// إنشاء فهرس على حقل واحد
db.customers.createIndex({ "customer_id": 1 });
// إنشاء فهرس مركب
db.customers.createIndex({ "address.city": 1, "age": -1 });
// إنشاء فهرس نصي
db.customers.createIndex({ "name": "text", "email": "text" });
// عرض جميع الفهارس
db.customers.getIndexes();
// حذف فهرس
db.customers.dropIndex("customer_id_1");
// 11. التعامل مع التواريخ
// استعلام حسب الشهر
db.customers.find({
"registration_date": {
"$gte": new Date("2024-01-01"),
"$lt": new Date("2024-02-01")
}
});
// تجميع حسب الشهر
db.customers.aggregate([
{
"$group": {
"_id": {
"year": { "$year": "$registration_date" },
"month": { "$month": "$registration_date" }
},
"count": { "$sum": 1 }
}
},
{ "$sort": { "_id.year": 1, "_id.month": 1 } }
]);
// 12. MongoDB مع Python (PyMongo)
from pymongo import MongoClient
from datetime import datetime
import json
# الاتصال بـ MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce_db']
collection = db['customers']
# إدراج بيانات
customer_data = {
"customer_id": 1004,
"name": "نورة القحطاني",
"email": "noura@example.com",
"age": 35,
"address": {
"city": "الدمام",
"country": "السعودية"
},
"registration_date": datetime.now()
}
result = collection.insert_one(customer_data)
print(f"تم إدراج المستند برقم: {result.inserted_id}")
# استعلام البيانات
query = {"address.city": "الرياض"}
customers = collection.find(query)
for customer in customers:
print(customer)
# التجميع
pipeline = [
{
"$group": {
"_id": "$address.city",
"total_customers": {"$sum": 1},
"avg_age": {"$avg": "$age"}
}
},
{"$sort": {"total_customers": -1}}
]
city_stats = collection.aggregate(pipeline)
for stat in city_stats:
print(stat)
# 13. MongoDB مع Spark
from pyspark.sql import SparkSession
# قراءة من MongoDB
spark = SparkSession.builder \
.appName("MongoDB Integration") \
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/ecommerce_db.customers") \
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/ecommerce_db.customers_processed") \
.getOrCreate()
# تحميل البيانات
mongo_df = spark.read \
.format("mongo") \
.load()
mongo_df.show()
# معالجة البيانات
processed_df = mongo_df.select(
"customer_id",
"name",
"age",
col("address.city").alias("city"),
col("address.country").alias("country")
).filter(col("age") > 25)
# الكتابة إلى MongoDB
processed_df.write \
.format("mongo") \
.mode("append") \
.option("collection", "processed_customers") \
.save()
# 14. Cassandra مثال
// تشغيل Cassandra CQL Shell
cqlsh
// إنشاء KeySpace
CREATE KEYSPACE IF NOT EXISTS ecommerce
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': 1
};
USE ecommerce;
// إنشاء جدول
CREATE TABLE IF NOT EXISTS customers (
customer_id UUID PRIMARY KEY,
name TEXT,
email TEXT,
age INT,
city TEXT,
registration_date TIMESTAMP,
tags SET
);
// إدراج بيانات
INSERT INTO customers (customer_id, name, email, age, city, registration_date, tags)
VALUES (uuid(), 'أحمد محمد', 'ahmed@example.com', 28, 'الرياض', '2024-01-15', {'vip', 'regular'});
// استعلام
SELECT * FROM customers WHERE city = 'الرياض' ALLOW FILTERING;
// 15. HBase مثال
# تشغيل HBase Shell
hbase shell
# إنشاء جدول
create 'customers', 'personal', 'contact', 'preferences'
# إدراج بيانات
put 'customers', '1001', 'personal:name', 'أحمد محمد'
put 'customers', '1001', 'personal:age', '28'
put 'customers', '1001', 'contact:email', 'ahmed@example.com'
put 'customers', '1001', 'contact:city', 'الرياض'
# استرجاع بيانات
get 'customers', '1001'
scan 'customers'
الخطوة 5: تعلم مستودعات البيانات
Data Warehousing هي عملية تخزين البيانات مركزياً لتحليلها وإنشاء تقارير، تستخدم أنظمة مثل Amazon Redshift، Google BigQuery، Snowflake
الأهمية:
ضرورية لإنشاء مستودعات بيانات مركزية لتحليل البيانات الكبيرة
الأدوات:
Amazon Redshift، Google BigQuery، Snowflake
مثال عملي: استعلام بيانات باستخدام Google BigQuery
-- 1. إنشاء مجموعة بيانات (Dataset)
CREATE SCHEMA IF NOT EXISTS `my-project.ecommerce_dataset`
OPTIONS(
location = 'US',
description = 'مستودع بيانات التجارة الإلكترونية'
);
-- 2. إنشاء جداول
-- جدول العملاء
CREATE OR REPLACE TABLE `my-project.ecommerce_dataset.customers` (
customer_id INT64,
name STRING,
email STRING,
age INT64,
city STRING,
country STRING,
registration_date DATE,
total_spent NUMERIC(15,2),
orders_count INT64,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
PARTITION BY DATE(registration_date)
CLUSTER BY city, country;
-- جدول المنتجات
CREATE OR REPLACE TABLE `my-project.ecommerce_dataset.products` (
product_id INT64,
product_name STRING,
category STRING,
subcategory STRING,
brand STRING,
price NUMERIC(10,2),
cost NUMERIC(10,2),
stock_quantity INT64,
created_at TIMESTAMP
)
PARTITION BY RANGE_BUCKET(product_id, GENERATE_ARRAY(1, 10000, 1000))
CLUSTER BY category, brand;
-- جدول الطلبات
CREATE OR REPLACE TABLE `my-project.ecommerce_dataset.orders` (
order_id INT64,
customer_id INT64,
order_date DATE,
order_time TIMESTAMP,
status STRING,
shipping_city STRING,
shipping_country STRING,
total_amount NUMERIC(15,2),
discount_amount NUMERIC(10,2),
net_amount NUMERIC(15,2),
payment_method STRING,
created_at TIMESTAMP
)
PARTITION BY DATE(order_date)
CLUSTER BY status, shipping_country;
-- جدول تفاصيل الطلبات
CREATE OR REPLACE TABLE `my-project.ecommerce_dataset.order_items` (
order_item_id INT64,
order_id INT64,
product_id INT64,
quantity INT64,
unit_price NUMERIC(10,2),
total_price NUMERIC(15,2),
discount_percent FLOAT64,
created_at TIMESTAMP
)
PARTITION BY DATE(_PARTITIONTIME)
CLUSTER BY order_id, product_id;
-- 3. تحميل البيانات
-- من Google Cloud Storage
LOAD DATA OVERWRITE `my-project.ecommerce_dataset.customers`
FROM FILES (
format = 'CSV',
uris = ['gs://my-bucket/customers/*.csv']
);
-- أو من جدول آخر
INSERT INTO `my-project.ecommerce_dataset.orders`
SELECT
order_id,
customer_id,
order_date,
order_timestamp,
status,
shipping_city,
shipping_country,
total_amount,
COALESCE(discount_amount, 0) AS discount_amount,
total_amount - COALESCE(discount_amount, 0) AS net_amount,
payment_method,
CURRENT_TIMESTAMP() AS created_at
FROM `source_project.source_dataset.raw_orders`;
-- 4. استعلامات تحليلية
-- تحليل المبيعات اليومية
SELECT
DATE(order_date) AS sales_date,
COUNT(DISTINCT order_id) AS order_count,
COUNT(order_item_id) AS item_count,
SUM(total_amount) AS gross_sales,
SUM(discount_amount) AS total_discounts,
SUM(net_amount) AS net_sales,
AVG(net_amount) AS avg_order_value
FROM `my-project.ecommerce_dataset.orders` o
JOIN `my-project.ecommerce_dataset.order_items` oi
ON o.order_id = oi.order_id
WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY sales_date
ORDER BY sales_date DESC;
-- تحليل العملاء
WITH customer_stats AS (
SELECT
c.customer_id,
c.name,
c.city,
c.country,
c.registration_date,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(o.net_amount) AS lifetime_value,
AVG(o.net_amount) AS avg_order_value,
DATE_DIFF(CURRENT_DATE(), c.registration_date, DAY) AS customer_age_days,
MAX(o.order_date) AS last_order_date
FROM `my-project.ecommerce_dataset.customers` c
LEFT JOIN `my-project.ecommerce_dataset.orders` o
ON c.customer_id = o.customer_id
GROUP BY 1, 2, 3, 4, 5
),
customer_segments AS (
SELECT
*,
CASE
WHEN lifetime_value >= 10000 THEN 'Platinum'
WHEN lifetime_value >= 5000 THEN 'Gold'
WHEN lifetime_value >= 1000 THEN 'Silver'
ELSE 'Bronze'
END AS customer_segment,
CASE
WHEN customer_age_days < 90 THEN 'New'
WHEN last_order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY) THEN 'Active'
WHEN last_order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY) THEN 'At Risk'
ELSE 'Inactive'
END AS engagement_status
FROM customer_stats
)
SELECT
customer_segment,
engagement_status,
COUNT(*) AS customer_count,
AVG(lifetime_value) AS avg_lifetime_value,
AVG(total_orders) AS avg_orders,
AVG(customer_age_days) AS avg_customer_age
FROM customer_segments
GROUP BY 1, 2
ORDER BY customer_segment, engagement_status;
-- 5. تحليل المنتجات
SELECT
p.category,
p.subcategory,
p.brand,
COUNT(DISTINCT oi.order_id) AS order_count,
SUM(oi.quantity) AS total_quantity_sold,
SUM(oi.total_price) AS gross_revenue,
SUM(oi.total_price * oi.discount_percent / 100) AS total_discount,
SUM(oi.quantity * p.cost) AS total_cost,
SUM(oi.total_price - (oi.total_price * oi.discount_percent / 100)) AS net_revenue,
SUM(oi.total_price - (oi.total_price * oi.discount_percent / 100) - (oi.quantity * p.cost)) AS gross_profit,
AVG(oi.unit_price) AS avg_selling_price
FROM `my-project.ecommerce_dataset.products` p
JOIN `my-project.ecommerce_dataset.order_items` oi
ON p.product_id = oi.product_id
JOIN `my-project.ecommerce_dataset.orders` o
ON oi.order_id = o.order_id
WHERE o.order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
GROUP BY 1, 2, 3
ORDER BY gross_profit DESC
LIMIT 20;
-- 6. تحليل جغرافي
SELECT
o.shipping_country,
o.shipping_city,
COUNT(DISTINCT o.order_id) AS order_count,
COUNT(DISTINCT o.customer_id) AS customer_count,
SUM(o.net_amount) AS total_revenue,
AVG(o.net_amount) AS avg_order_value,
SUM(oi.quantity) AS total_items_sold
FROM `my-project.ecommerce_dataset.orders` o
JOIN `my-project.ecommerce_dataset.order_items` oi
ON o.order_id = oi.order_id
WHERE o.order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
GROUP BY 1, 2
ORDER BY total_revenue DESC;
-- 7. تحليل التسلسل الزمني
SELECT
EXTRACT(YEAR FROM order_date) AS year,
EXTRACT(MONTH FROM order_date) AS month,
EXTRACT(QUARTER FROM order_date) AS quarter,
COUNT(DISTINCT order_id) AS order_count,
SUM(net_amount) AS monthly_revenue,
LAG(SUM(net_amount), 1) OVER (ORDER BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date)) AS prev_month_revenue,
SUM(net_amount) - LAG(SUM(net_amount), 1) OVER (ORDER BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date)) AS revenue_growth,
(SUM(net_amount) - LAG(SUM(net_amount), 1) OVER (ORDER BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date))) /
LAG(SUM(net_amount), 1) OVER (ORDER BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date)) * 100 AS growth_percentage
FROM `my-project.ecommerce_dataset.orders`
WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 24 MONTH)
GROUP BY 1, 2, 3
ORDER BY 1, 2;
-- 8. إنشاء Views للتقارير
CREATE OR REPLACE VIEW `my-project.ecommerce_dataset.daily_sales_report` AS
SELECT
DATE(order_date) AS report_date,
COUNT(DISTINCT order_id) AS total_orders,
COUNT(DISTINCT customer_id) AS unique_customers,
SUM(net_amount) AS daily_revenue,
SUM(CASE WHEN status = 'completed' THEN net_amount ELSE 0 END) AS completed_revenue,
SUM(CASE WHEN status = 'cancelled' THEN net_amount ELSE 0 END) AS cancelled_revenue,
AVG(net_amount) AS avg_order_value
FROM `my-project.ecommerce_dataset.orders`
WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
GROUP BY 1;
-- 9. الجداول المجدولة (Scheduled Tables)
-- إنشاء جدول مجدول للتحديث اليومي
CREATE OR REPLACE TABLE `my-project.ecommerce_dataset.daily_customer_summary`
PARTITION BY summary_date
CLUSTER BY city, country
AS
SELECT
CURRENT_DATE() AS summary_date,
c.city,
c.country,
COUNT(DISTINCT c.customer_id) AS total_customers,
COUNT(DISTINCT o.order_id) AS daily_orders,
SUM(o.net_amount) AS daily_revenue,
AVG(o.net_amount) AS avg_order_value
FROM `my-project.ecommerce_dataset.customers` c
LEFT JOIN `my-project.ecommerce_dataset.orders` o
ON c.customer_id = o.customer_id
AND DATE(o.order_date) = CURRENT_DATE()
GROUP BY 1, 2, 3;
-- 10. UDFs مخصصة
CREATE OR REPLACE FUNCTION `my-project.ecommerce_dataset.calculate_age`(birth_date DATE)
RETURNS INT64
AS (
DATE_DIFF(CURRENT_DATE(), birth_date, YEAR)
);
-- استخدام الـ UDF
SELECT
name,
`my-project.ecommerce_dataset.calculate_age`(birth_date) AS age
FROM `my-project.ecommerce_dataset.customers`;
-- 11. التعلم الآلي مع BigQuery ML
-- إنشاء نموذج للتنبؤ بقيمة الطلب
CREATE OR REPLACE MODEL `my-project.ecommerce_dataset.order_value_model`
OPTIONS(
model_type = 'linear_reg',
input_label_cols = ['net_amount']
) AS
SELECT
customer_id,
EXTRACT(MONTH FROM order_date) AS order_month,
EXTRACT(DAYOFWEEK FROM order_date) AS order_day_of_week,
shipping_country,
payment_method,
COUNT(order_id) OVER (PARTITION BY customer_id) AS customer_total_orders,
AVG(net_amount) OVER (PARTITION BY customer_id) AS customer_avg_order_value,
net_amount
FROM `my-project.ecommerce_dataset.orders`
WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY);
-- تقييم النموذج
SELECT * FROM ML.EVALUATE(MODEL `my-project.ecommerce_dataset.order_value_model`);
-- التنبؤ
SELECT
customer_id,
predicted_net_amount,
net_amount AS actual_value
FROM ML.PREDICT(
MODEL `my-project.ecommerce_dataset.order_value_model`,
SELECT * FROM `my-project.ecommerce_dataset.orders`
WHERE order_date = CURRENT_DATE()
);
-- 12. تصدير البيانات
-- إلى Google Cloud Storage
EXPORT DATA
OPTIONS(
uri = 'gs://my-bucket/exports/sales_*.csv',
format = 'CSV',
overwrite = true,
header = true,
field_delimiter = ','
) AS
SELECT * FROM `my-project.ecommerce_dataset.daily_sales_report`;
-- 13. مراقبة التكاليف والأداء
-- مراقبة استعلامات اليوم
SELECT
job_id,
query,
total_bytes_processed,
total_slot_ms,
creation_time,
end_time,
state
FROM `region-us`.INFORMATION_SCHEMA.JOBS
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
ORDER BY total_bytes_processed DESC
LIMIT 10;
-- 14. إدارة الصلاحيات
-- منح صلاحيات للقراءة فقط
GRANT `roles/bigquery.dataViewer`
ON SCHEMA `my-project.ecommerce_dataset`
TO "user:analyst@company.com";
-- منح صلاحيات للكتابة
GRANT `roles/bigquery.dataEditor`
ON TABLE `my-project.ecommerce_dataset.customers`
TO "user:developer@company.com";
-- 15. تنظيف البيانات القديمة
-- حذف البيانات القديمة (أكثر من 3 سنوات)
DELETE FROM `my-project.ecommerce_dataset.orders`
WHERE order_date < DATE_SUB(CURRENT_DATE(), INTERVAL 3 YEAR);
-- أو أرشفة البيانات
CREATE OR REPLACE TABLE `my-project.ecommerce_dataset.orders_archive`
PARTITION BY order_date
CLUSTER BY status
AS
SELECT * FROM `my-project.ecommerce_dataset.orders`
WHERE order_date < DATE_SUB(CURRENT_DATE(), INTERVAL 1 YEAR);
-- ثم حذف من الجدول الرئيسي
DELETE FROM `my-project.ecommerce_dataset.orders`
WHERE order_date < DATE_SUB(CURRENT_DATE(), INTERVAL 1 YEAR);
هندسة البيانات الضخمة
مصادر البيانات
قواعد بيانات، ملفات، APIs
التجميع
Kafka، Flume، Sqoop
المعالجة
Spark، Hadoop، Flink
التحليل
Hive، Pig، Presto
أدوات البيانات الضخمة
Hadoop Ecosystem
إطار عمل مفتوح المصدر لتخزين ومعالجة البيانات الضخمة
Apache Spark
محرك معالجة بيانات سريع ومتوازي
Data Warehouses
أنظمة تخزين مركزية للتحليل وإعداد التقارير
المزايا والتحديات
المزايا
- طلب عالي: هناك طلب كبير على مختصي البيانات الضخمة، خاصة في الشركات التي تعتمد على البيانات الكبيرة
- أدوات مجانية: معظم الأدوات المستخدمة مثل Spark، Hadoop مجانية ومفتوحة المصدر
- مجتمع كبير: Hadoop و Spark لديهما مجتمعات نشطة توفر الدعم والموارد
- إبداع لا محدود: يمكنك بناء أنظمة معقدة لمعالجة البيانات وتحليلها
- رواتب عالية: متخصصو البيانات الضخمة من أكثر المهن دخلاً في مجال التكنولوجيا
التحديات
- منحنى التعلم الحاد: يتطلب فهماً جيداً لـ SQL والرياضيات والإحصاء
- حجم البيانات: قد تواجه تحديات في التعامل مع مجموعات بيانات كبيرة جداً
- تحديثات متكررة: الأدوات والتقنيات تتطور باستمرار، مما يتطلب تحديث المعرفة بشكل منتظم
- تعقيد الأنظمة: أنظمة البيانات الضخمة معقدة وتتطلب فهماً عميقاً للبنية التحتية
تخصصات في البيانات الضخمة
تحليل البيانات
تحليل البيانات الضخمة لاستخراج رؤى قابلة للتنفيذ
هندسة البيانات
بناء وتصميم أنظمة البيانات الضخمة
علم البيانات
تطوير نماذج تعلم آلي على البيانات الضخمة
الخلاصة
متخصص البيانات الضخمة مجال متقدم ومطلوب بشدة في سوق العمل. من خلال إتقان Hadoop، Spark، Hive، وقواعد البيانات NoSQL، يمكنك بناء أنظمة متكاملة لمعالجة وتحليل البيانات الضخمة واستخراج رؤى قيمة للشركات.
نصائح للبدء:
- ابدأ بتعلم SQL الأساسي والمتقدم
- تخصص في Python للبرمجة وتحليل البيانات
- تعلم Hadoop Ecosystem ومكوناته الأساسية
- اتقن Apache Spark لمعالجة البيانات بسرعة
- جرب أنظمة قواعد البيانات NoSQL المختلفة