محقق الطب الشرعي الرقمي
ما هو محقق الطب الشرعي الرقمي؟
باحث الأدلة الرقمية
محقق الطب الشرعي الرقمي يركز على جمع الأدلة الرقمية وتحليلها باستخدام أدوات مثل FTK Imager و Splunk، بهدف حل القضايا الجنائية وتحقيق العدالة.
جمع الأدلة
تجمع وتحفظ الأدلة الرقمية بشكل قانوني
تحليل الأدلة
فحص البيانات والملفات والاتصالات
التقارير
إنشاء تقارير مفصلة للسلطات القانونية
إثبات الأدلة
تقديم الأدلة في المحاكم بشكل مقبول قانونياً
اللغات والأدوات المستخدمة
Python
اللغة الأساسية للتعامل مع البيانات وتحليل النماذج
Shell Scripting
لأتمتة العمليات في أنظمة Linux/Unix
NoSQL/Hive
قواعد البيانات غير العلائقية لتحليل البيانات
JSON/YAML
تكوين ملفات الإعدادات والمعلمات
Markdown
إنشاء تقارير واضحة ومفصلة
Jupyter Notebook
توثيق التحليلات بشكل تفاعلي
مهارات محقق الطب الشرعي الرقمي
جمع الأدلة الرقمية
استخدام FTK Imager و Autopsy لجمع الأدلة
استعادة الملفات
استخدام TestDisk و PhotoRec لاستعادة البيانات
تحليل السجلات
استخدام Splunk و ELK Stack لتحليل السجلات
الأدوات الرقمية
FTK Imager، Autopsy، TestDisk، PhotoRec
تحليل الأدلة
فحص البيانات والملفات والاتصالات الرقمية
إعداد التقارير
إنشاء تقارير مفصلة للسلطات القانونية
خارطة التعلم خطوة بخطوة
الخطوة 1: تعلم جمع الأدلة الرقمية
جمع الأدلة الرقمية هو عملية جمع الأدلة الرقمية من الأجهزة مثل الهواتف الذكية، الحواسيب، والأقراص الصلبة. يتم استخدام هذه الأدلة في التحقيقات الجنائية.
الأهمية:
الأساس لفهم كيفية جمع الأدلة الرقمية بطريقة قانونية وموثوقة
الأدوات:
FTK Imager، Autopsy
أمثلة جمع الأدلة:
# ===== جمع الأدلة الرقمية باستخدام FTK Imager =====
# 1. إنشاء صورة قرص باستخدام FTK Imager
# خطوات استخدام واجهة المستخدم:
# File > Create Disk Image > Select Source > Save Image
# 2. سكربت bash لأتمتة جمع الأدلة
cat > collect_evidence.sh << 'EOF'
#!/bin/bash
# سكربت جمع الأدلة الرقمية
echo "بدء عملية جمع الأدلة الرقمية..."
echo "=================================="
# معلومات النظام
echo "1. جمع معلومات النظام:"
uname -a > system_info.txt
lsb_release -a >> system_info.txt
echo "تم حفظ معلومات النظام في system_info.txt"
# معلومات المستخدمين
echo "2. جمع معلومات المستخدمين:"
cat /etc/passwd > users_info.txt
cat /etc/group >> users_info.txt
echo "تم حفظ معلومات المستخدمين في users_info.txt"
# العمليات الجارية
echo "3. جمع معلومات العمليات:"
ps aux > processes.txt
netstat -tulpn > network_connections.txt
echo "تم حفظ معلومات العمليات في processes.txt"
# السجلات
echo "4. جمع السجلات المهمة:"
mkdir -p logs
cp /var/log/auth.log logs/
cp /var/log/syslog logs/
cp /var/log/kern.log logs/
cp /var/log/apache2/access.log logs/ 2>/dev/null || true
cp /var/log/apache2/error.log logs/ 2>/dev/null || true
# معلومات القرص
echo "5. جمع معلومات القرص:"
df -h > disk_usage.txt
fdisk -l > disk_partitions.txt
lsblk > block_devices.txt
echo "تم حفظ معلومات القرص في ملفات منفصلة"
# إنشاء هاش للأدلة
echo "6. إنشاء هاش للأدلة:"
find . -type f -exec sha256sum {} \; > evidence_hashes.txt
echo "تم إنشاء هاش SHA256 لكل ملف"
# ضغط الأدلة
echo "7. ضغط الأدلة:"
timestamp=$(date +%Y%m%d_%H%M%S)
tar -czf evidence_$timestamp.tar.gz *.txt logs/
echo "تم ضغط الأدلة في evidence_$timestamp.tar.gz"
# حساب هاش الأرشيف
sha256sum evidence_$timestamp.tar.gz > evidence_hash.txt
echo "تم حساب هاش الأرشيف:"
cat evidence_hash.txt
echo "تم الانتهاء من جمع الأدلة!"
EOF
chmod +x collect_evidence.sh
sudo ./collect_evidence.sh
# 3. برنامج Python لجمع الأدلة
import hashlib
import os
import json
from datetime import datetime
class DigitalEvidenceCollector:
def __init__(self, case_number, investigator):
self.case_number = case_number
self.investigator = investigator
self.evidence = {
'metadata': {},
'files': [],
'timeline': [],
'hashes': {}
}
def collect_system_info(self):
"""جمع معلومات النظام"""
import platform
import socket
self.evidence['metadata']['system'] = {
'hostname': socket.gethostname(),
'os': platform.system(),
'os_version': platform.version(),
'processor': platform.processor(),
'architecture': platform.architecture()[0]
}
# إضافة إلى الجدول الزمني
self.evidence['timeline'].append({
'timestamp': datetime.now().isoformat(),
'action': 'collected_system_info',
'description': 'جمع معلومات النظام الأساسية'
})
return self.evidence['metadata']['system']
def hash_file(self, file_path):
"""إنشاء هاش للملف"""
hash_sha256 = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
file_hash = hash_sha256.hexdigest()
self.evidence['hashes'][file_path] = file_hash
return file_hash
except Exception as e:
print(f"خطأ في حساب هاش الملف {file_path}: {e}")
return None
def collect_evidence_files(self, directory, extensions=None):
"""جمع الملفات كأدلة"""
evidence_files = []
for root, dirs, files in os.walk(directory):
for file in files:
if extensions:
if any(file.endswith(ext) for ext in extensions):
file_path = os.path.join(root, file)
evidence_files.append(file_path)
else:
file_path = os.path.join(root, file)
evidence_files.append(file_path)
# حساب الهاش للملفات
for file_path in evidence_files[:10]: # أول 10 ملفات كمثال
file_hash = self.hash_file(file_path)
self.evidence['files'].append({
'path': file_path,
'hash': file_hash,
'size': os.path.getsize(file_path),
'modified': datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
})
return evidence_files
def analyze_logs(self, log_file):
"""تحليل سجلات النظام"""
suspicious_patterns = [
'Failed password',
'Invalid user',
'authentication failure',
'unauthorized',
'access denied',
'malware',
'virus detected'
]
findings = []
try:
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
for i, line in enumerate(lines):
for pattern in suspicious_patterns:
if pattern.lower() in line.lower():
findings.append({
'line_number': i + 1,
'pattern': pattern,
'content': line.strip(),
'timestamp': datetime.now().isoformat()
})
except Exception as e:
print(f"خطأ في قراءة ملف السجل {log_file}: {e}")
return findings
def generate_report(self):
"""إنشاء تقرير الأدلة"""
report = {
'case_number': self.case_number,
'investigator': self.investigator,
'collection_date': datetime.now().isoformat(),
'evidence_summary': {
'total_files': len(self.evidence['files']),
'total_hashes': len(self.evidence['hashes']),
'timeline_entries': len(self.evidence['timeline'])
},
'system_info': self.evidence['metadata'].get('system', {}),
'suspicious_findings': [],
'chain_of_custody': self.evidence['timeline']
}
# حفظ التقرير
report_filename = f"digital_evidence_report_{self.case_number}.json"
with open(report_filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"تم إنشاء تقرير الأدلة: {report_filename}")
return report_filename
# تشغيل المجمع
if __name__ == "__main__":
print("بدء جمع الأدلة الرقمية...")
print("=" * 40)
collector = DigitalEvidenceCollector(
case_number="CASE-2024-001",
investigator="محقق رقمي"
)
# جمع معلومات النظام
system_info = collector.collect_system_info()
print("✅ معلومات النظام:")
for key, value in system_info.items():
print(f" {key}: {value}")
# جمع ملفات الأدلة
print("\n🔍 جمع ملفات الأدلة...")
evidence_files = collector.collect_evidence_files(
directory="/var/log",
extensions=[".log", ".txt"]
)
print(f"✅ تم جمع {len(evidence_files)} ملف كأدلة")
# تحليل السجلات
print("\n📊 تحليل سجلات النظام...")
log_findings = collector.analyze_logs("/var/log/auth.log")
print(f"✅ تم العثور على {len(log_findings)} نشاط مشبوه")
# إنشاء التقرير
print("\n📄 إنشاء تقرير الأدلة...")
report_file = collector.generate_report()
print(f"✅ تم حفظ التقرير في: {report_file}")
print("\n" + "=" * 40)
print("✅ اكتمل جمع الأدلة الرقمية بنجاح")
# 4. أوامر Linux لجمع الأدلة
echo "أمثلة أوامر جمع الأدلة:"
# جمع معلومات النظام
uname -a
cat /etc/os-release
hostnamectl
# جمع معلومات الشبكة
ip addr show
netstat -tulpn
ss -tulpn
# جمع معلومات المستخدمين
cat /etc/passwd
cat /etc/group
last
lastlog
# جمع معلومات القرص
df -h
lsblk
fdisk -l
# جمع السجلات
journalctl -xe
dmesg | tail -50
cat /var/log/syslog | tail -100
# حساب هاش الملفات
sha256sum important_file.txt
md5sum evidence_file.bin
# البحث عن ملفات معينة
find / -name "*.conf" -type f 2>/dev/null
find / -perm -4000 -type f 2>/dev/null # ملفات SUID
# إنشاء صورة للقرص
# dd if=/dev/sda of=disk_image.img bs=4M status=progress
الخطوة 2: تعلم استعادة الملفات
استعادة الملفات هو عملية استعادة الملفات المحذوفة أو التالفة من الأجهزة. يستخدم لاستعادة البيانات المفقودة بسبب الحوادث أو الهجمات السيبرانية.
الأهمية:
ضروري لفهم كيفية استعادة البيانات المهمة واستخدامها كدليل رقمي
الأدوات:
TestDisk، PhotoRec
أمثلة استعادة الملفات:
# ===== استعادة الملفات باستخدام TestDisk و PhotoRec =====
# 1. استخدام TestDisk لاستعادة الملفات المحذوفة
echo "1. استعادة الملفات باستخدام TestDisk:"
# بدء TestDisk
# sudo testdisk
# خطوات TestDisk:
# 1. Create a new log file
# 2. Select disk (e.g., /dev/sda)
# 3. Select partition table type (usually Intel)
# 4. Choose [Analyse] > [Quick Search]
# 5. Select partition > [Type] to change filesystem type if needed
# 6. Choose [Advanced] > [Undelete]
# 7. Select files to recover > Copy to safe location
# 2. استخدام PhotoRec لاستعادة البيانات
echo "2. استعادة البيانات باستخدام PhotoRec:"
# بدء PhotoRec
# sudo photorec
# خطوات PhotoRec:
# 1. Select disk
# 2. Choose partition
# 3. Select filesystem (FAT, NTFS, ext2/ext3/ext4)
# 4. Choose file types to recover
# 5. Select destination folder
# 6. Start recovery
# 3. سكربت bash لأتمتة استعادة الملفات
cat > file_recovery.sh << 'EOF'
#!/bin/bash
# سكربت استعادة الملفات المتقدمة
echo "بدء عملية استعادة الملفات..."
echo "============================="
RECOVERY_DIR="recovered_files_$(date +%Y%m%d_%H%M%S)"
mkdir -p "$RECOVERY_DIR"
# 1. البحث عن الملفات المحذوفة حديثاً
echo "1. البحث عن الملفات المحذوفة..."
deleted_files=$(find / -type f -name "*.tmp" -o -name "*.bak" -o -name "*.old" 2>/dev/null | head -20)
if [ -n "$deleted_files" ]; then
echo "✅ تم العثور على ملفات محذوفة محتملة:"
echo "$deleted_files" | head -5
else
echo "❌ لم يتم العثور على ملفات محذوفة"
fi
# 2. استعادة ملفات السجل
echo -e "\n2. استعادة ملفات السجل..."
log_files=(
"/var/log/auth.log"
"/var/log/syslog"
"/var/log/kern.log"
"/var/log/dmesg"
)
for log_file in "${log_files[@]}"; do
if [ -f "$log_file" ]; then
cp "$log_file" "$RECOVERY_DIR/"
echo " ✅ تم نسخ: $(basename $log_file)"
fi
done
# 3. استعادة ملفات التكوين
echo -e "\n3. استعادة ملفات التكوين..."
config_files=$(find /etc -name "*.conf" -type f 2>/dev/null | head -10)
for config in $config_files; do
cp "$config" "$RECOVERY_DIR/"
echo " ✅ تم نسخ: $(basename $config)"
done
# 4. البحث عن ملفات مؤقتة
echo -e "\n4. البحث عن الملفات المؤقتة..."
temp_dirs=("/tmp" "/var/tmp" "~/.cache" "~/.local/share/Trash")
for temp_dir in "${temp_dirs[@]}"; do
if [ -d "$temp_dir" ]; then
find "$temp_dir" -type f -mtime -7 2>/dev/null | head -5 | while read file; do
rel_path=$(echo "$file" | sed 's|/|_|g')
cp "$file" "$RECOVERY_DIR/temp_${rel_path##*_}"
done
fi
done
# 5. إنشاء قائمة بالملفات المستعادة
echo -e "\n5. إنشاء قائمة الملفات المستعادة..."
find "$RECOVERY_DIR" -type f > "$RECOVERY_DIR/recovery_list.txt"
file_count=$(wc -l < "$RECOVERY_DIR/recovery_list.txt")
# 6. حساب هاش الملفات المستعادة
echo -e "\n6. حساب هاش الملفات..."
find "$RECOVERY_DIR" -type f -exec sha256sum {} \; > "$RECOVERY_DIR/file_hashes.txt"
echo -e "\n✅ تم استعادة $file_count ملف"
echo " المجلد: $RECOVERY_DIR"
echo " قائمة الملفات: $RECOVERY_DIR/recovery_list.txt"
echo " هاش الملفات: $RECOVERY_DIR/file_hashes.txt"
EOF
chmod +x file_recovery.sh
sudo ./file_recovery.sh
# 4. برنامج Python لاستعادة الملفات
import os
import shutil
import hashlib
from datetime import datetime, timedelta
class FileRecoveryTool:
def __init__(self, recovery_path="recovered_data"):
self.recovery_path = recovery_path
self.recovered_files = []
if not os.path.exists(recovery_path):
os.makedirs(recovery_path)
def recover_deleted_files_by_signature(self, disk_path, signatures):
"""استعادة الملفات باستخدام التوقيعات"""
print(f"البحث عن ملفات على {disk_path} باستخدام التوقيعات...")
# توقيعات الملفات الشائعة (magic numbers)
file_signatures = {
'jpg': b'\xFF\xD8\xFF',
'png': b'\x89\x50\x4E\x47',
'pdf': b'\x25\x50\x44\x46',
'zip': b'\x50\x4B\x03\x04',
'doc': b'\xD0\xCF\x11\xE0',
'exe': b'\x4D\x5A'
}
recovered = []
try:
with open(disk_path, 'rb') as disk:
# قراءة القرص في قطع
chunk_size = 4096
offset = 0
while True:
chunk = disk.read(chunk_size)
if not chunk:
break
# البحث عن التوقيعات
for file_type, signature in file_signatures.items():
if signature in chunk:
# استخراج الملف
sig_index = chunk.find(signature)
if sig_index != -1:
# حساب موقع بداية الملف
file_start = offset + sig_index
# محاولة استخراج الملف
recovered_file = self.extract_file(
disk_path,
file_start,
file_type
)
if recovered_file:
recovered.append(recovered_file)
offset += chunk_size
except Exception as e:
print(f"خطأ في قراءة القرص: {e}")
return recovered
def extract_file(self, disk_path, start_offset, file_type):
"""استخراج ملف من القرص"""
try:
# تحديد نهاية الملف (هذه طريقة مبسطة)
file_sizes = {
'jpg': 1024 * 100, # 100KB متوسط
'png': 1024 * 200, # 200KB متوسط
'pdf': 1024 * 500, # 500KB متوسط
'doc': 1024 * 300, # 300KB متوسط
}
file_size = file_sizes.get(file_type, 1024 * 100) # افتراضي 100KB
with open(disk_path, 'rb') as disk:
disk.seek(start_offset)
file_data = disk.read(file_size)
# حفظ الملف المسترجع
filename = f"recovered_{start_offset:08x}.{file_type}"
filepath = os.path.join(self.recovery_path, filename)
with open(filepath, 'wb') as f:
f.write(file_data)
# حساب الهاش
file_hash = hashlib.sha256(file_data).hexdigest()
self.recovered_files.append({
'filename': filename,
'path': filepath,
'size': len(file_data),
'hash': file_hash,
'type': file_type,
'offset': start_offset
})
print(f"✅ تم استعادة: {filename} ({len(file_data)} bytes)")
return filepath
except Exception as e:
print(f"خطأ في استخراج الملف: {e}")
return None
def recover_recent_files(self, directories, days=7):
"""استعادة الملفات المعدلة مؤخراً"""
print(f"البحث عن الملفات المعدلة خلال {days} أيام...")
recovered = []
cutoff_time = datetime.now() - timedelta(days=days)
for directory in directories:
if os.path.exists(directory):
for root, dirs, files in os.walk(directory):
for file in files:
filepath = os.path.join(root, file)
try:
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
if mtime > cutoff_time:
# نسخ الملف
rel_path = os.path.relpath(filepath, directory)
dest_path = os.path.join(
self.recovery_path,
"recent",
rel_path
)
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
shutil.copy2(filepath, dest_path)
recovered.append({
'original': filepath,
'recovered': dest_path,
'modified': mtime.isoformat(),
'size': os.path.getsize(filepath)
})
except Exception as e:
continue
return recovered
def analyze_recovered_files(self):
"""تحليل الملفات المستعادة"""
print("\nتحليل الملفات المستعادة...")
analysis = {
'total_files': len(self.recovered_files),
'by_type': {},
'total_size': 0,
'suspicious_files': []
}
suspicious_keywords = [
'pass', 'secret', 'confidential',
'bank', 'credit', 'password',
'admin', 'root', 'key', 'token'
]
for file_info in self.recovered_files:
# إحصاء حسب النوع
file_type = file_info['type']
analysis['by_type'][file_type] = analysis['by_type'].get(file_type, 0) + 1
# جمع الحجم الكلي
analysis['total_size'] += file_info.get('size', 0)
# البحث عن ملفات مشبوهة
try:
with open(file_info['path'], 'rb') as f:
content = f.read(1024) # قراءة أول 1KB
for keyword in suspicious_keywords:
if keyword.encode() in content.lower():
analysis['suspicious_files'].append({
'file': file_info['filename'],
'keyword': keyword,
'path': file_info['path']
})
break
except:
pass
return analysis
def generate_recovery_report(self):
"""إنشاء تقرير الاستعادة"""
print("\nإنشاء تقرير الاستعادة...")
analysis = self.analyze_recovered_files()
report = {
'recovery_date': datetime.now().isoformat(),
'recovery_path': os.path.abspath(self.recovery_path),
'summary': analysis,
'recovered_files': self.recovered_files,
'chain_of_custody': [
{
'timestamp': datetime.now().isoformat(),
'action': 'recovery_completed',
'details': f"تم استعادة {analysis['total_files']} ملف"
}
]
}
# حفظ التقرير
report_file = os.path.join(self.recovery_path, "recovery_report.json")
import json
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"✅ تم حفظ تقرير الاستعادة في: {report_file}")
return report_file
# تشغيل أداة الاستعادة
if __name__ == "__main__":
print("بدء أداة استعادة الملفات الرقمية")
print("=" * 40)
# إنشاء أداة الاستعادة
recovery = FileRecoveryTool("digital_recovery_evidence")
# استعادة الملفات المعدلة حديثاً
print("\n1. استعادة الملفات المعدلة حديثاً:")
recent_dirs = ["/etc", "/var/log", "/tmp"]
recent_files = recovery.recover_recent_files(recent_dirs, days=3)
print(f"✅ تم استعادة {len(recent_files)} ملف معدل حديثاً")
# محاولة استعادة باستخدام التوقيعات (محاكاة)
print("\n2. البحث عن ملفات محذوفة باستخدام التوقيعات:")
# في الواقع، ستكون disk_path مثل "/dev/sda1"
# signatures = recovery.recover_deleted_files_by_signature("/dev/sda1", {})
# إنشاء تقرير
print("\n3. إنشاء تقرير الاستعادة:")
report = recovery.generate_recovery_report()
# عرض ملخص
analysis = recovery.analyze_recovered_files()
print("\n" + "=" * 40)
print("ملخص الاستعادة:")
print(f" • إجمالي الملفات: {analysis['total_files']}")
print(f" • الحجم الكلي: {analysis['total_size']:,} بايت")
print(f" • الملفات المشبوهة: {len(analysis['suspicious_files'])}")
if analysis['suspicious_files']:
print("\n ⚠️ الملفات المشبوهة:")
for suspicious in analysis['suspicious_files'][:3]:
print(f" - {suspicious['file']} ({suspicious['keyword']})")
print("\n✅ اكتملت عملية استعادة الملفات")
# 5. أوامر Linux لاستعادة الملفات
echo "أمثلة أوامر استعادة الملفات:"
# البحث عن الملفات المحذوفة
grep -a "keyword" /dev/sda1 | head -20
# استعادة سجلات النظام
cp /var/log/auth.log ./recovered/
cp /var/log/syslog ./recovered/
# إنشاء صورة للذاكرة
# dd if=/dev/mem of=memory_dump.img bs=1M
# تحليل الملفات الثنائية
strings deleted_file.bin | grep -i "password\|user\|admin"
# فحص الملفات المؤقتة
find /tmp -type f -mtime -1 2>/dev/null
find /var/tmp -type f -size +1M 2>/dev/null
# حساب هاش الملفات المستعادة
find recovered_files -type f -exec sha256sum {} \; > recovered_hashes.txt
echo "تم تعلم استعادة الملفات بنجاح!"
الخطوة 3: تعلم تحليل السجلات
تحليل السجلات هو عملية تحليل السجلات (Logs) لاكتشاف الأنشطة المشبوهة أو غير المصرح بها. يتم استخدامه لاكتشاف الهجمات السيبرانية أو تتبع الأنشطة غير القانونية.
الأهمية:
ضروري لفهم كيفية اكتشاف التهديدات ومنع الهجمات المستقبلية
الأدوات:
Splunk، ELK Stack
أمثلة تحليل السجلات:
# ===== تحليل السجلات باستخدام Splunk و ELK Stack =====
# 1. استعلامات Splunk للتحقيقات الرقمية
echo "1. استعلامات Splunk للتحقيقات الرقمية:"
# اكتشاف محاولات تسجيل الدخول الفاشلة
echo "استعلام لتسجيلات الدخول الفاشلة:"
echo 'index=main sourcetype=secure "Failed password" | stats count by src_ip, user'
# اكتشاف نشاط غير عادي
echo "استعلام للنشاط غير العادي:"
echo 'index=main sourcetype=web_access status=404 | top 10 uri'
# تتبع نشاط المستخدم
echo "استعلام لتتبع نشاط المستخدم:"
echo 'index=main user=admin | reverse | table _time, action, src_ip, dest_ip'
# اكتشاف البرمجيات الخبيثة
echo "استعلام للكشف عن البرمجيات الخبيثة:"
echo 'index=main "malware" OR "virus" OR "trojan" | stats count by src_ip, dest_ip'
# 2. استعلامات ELK Stack (Kibana)
echo -e "\n2. استعلامات ELK Stack للتحقيقات الرقمية:"
# استعلام Elasticsearch لاكتشاف الهجمات
echo "استعلام Elasticsearch:"
echo 'GET /logs-*/_search
{
"query": {
"bool": {
"must": [
{ "match": { "event.action": "login_failed" } },
{ "range": { "@timestamp": { "gte": "now-24h" } } }
]
}
},
"aggs": {
"failed_by_ip": {
"terms": { "field": "source.ip.keyword", "size": 10 }
}
}
}'
# 3. سكربت Python لتحليل السجلات المتقدم
import re
import json
from datetime import datetime, timedelta
from collections import Counter, defaultdict
class LogAnalyzer:
def __init__(self):
self.findings = []
self.suspicious_ips = set()
self.patterns = {
'failed_login': r'Failed password for (\S+) from (\S+)',
'invalid_user': r'Invalid user (\S+) from (\S+)',
'brute_force': r'POSSIBLE BREAK-IN ATTEMPT',
'port_scan': r'port scan',
'malware': r'(malware|virus|trojan|ransomware)',
'data_exfiltration': r'(upload|download).*(sensitive|confidential)'
}
def analyze_auth_log(self, log_file):
"""تحليل سجلات المصادقة"""
print(f"تحليل سجلات المصادقة: {log_file}")
findings = {
'failed_logins': [],
'invalid_users': [],
'suspicious_ips': [],
'timeline': []
}
try:
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
# اكتشاف محاولات تسجيل دخول فاشلة
failed_match = re.search(self.patterns['failed_login'], line)
if failed_match:
user, ip = failed_match.groups()
findings['failed_logins'].append({
'line': line_num,
'user': user,
'ip': ip,
'timestamp': self.extract_timestamp(line),
'raw': line[:100] + '...' if len(line) > 100 else line
})
if ip not in self.suspicious_ips:
self.suspicious_ips.add(ip)
# اكتشاف مستخدمين غير صالحين
invalid_match = re.search(self.patterns['invalid_user'], line)
if invalid_match:
user, ip = invalid_match.groups()
findings['invalid_users'].append({
'line': line_num,
'user': user,
'ip': ip,
'timestamp': self.extract_timestamp(line),
'raw': line[:100] + '...' if len(line) > 100 else line
})
# اكتشاف محاولات اختراق
if 'POSSIBLE BREAK-IN ATTEMPT' in line:
findings['suspicious_ips'].append({
'line': line_num,
'type': 'break_in_attempt',
'ip': self.extract_ip(line),
'timestamp': self.extract_timestamp(line),
'raw': line[:100] + '...' if len(line) > 100 else line
})
# إنشاء جدول زمني
timestamp = self.extract_timestamp(line)
if timestamp:
findings['timeline'].append({
'timestamp': timestamp,
'event': self.classify_event(line),
'line': line_num
})
except Exception as e:
print(f"خطأ في تحليل ملف السجل: {e}")
return findings
def extract_timestamp(self, log_line):
"""استخراج الطابع الزمني من سجل"""
# أنماط الطوابع الزمنية الشائعة
patterns = [
r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})', # Jan 1 12:00:00
r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})', # ISO 8601
r'(\d{2}/\d{2}/\d{4}\s+\d{2}:\d{2}:\d{2})' # MM/DD/YYYY HH:MM:SS
]
for pattern in patterns:
match = re.search(pattern, log_line)
if match:
return match.group(1)
return None
def extract_ip(self, log_line):
"""استخراج عنوان IP من السجل"""
ip_pattern = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
match = re.search(ip_pattern, log_line)
return match.group(0) if match else 'Unknown'
def classify_event(self, log_line):
"""تصنيف الحدث بناءً على محتوى السجل"""
if 'Failed password' in log_line:
return 'failed_login'
elif 'Invalid user' in log_line:
return 'invalid_user'
elif 'Accepted password' in log_line:
return 'successful_login'
elif 'session opened' in log_line:
return 'session_opened'
elif 'session closed' in log_line:
return 'session_closed'
elif 'error' in log_line.lower():
return 'error'
else:
return 'other'
def analyze_web_logs(self, log_file):
"""تحليل سجلات الويب"""
print(f"تحليل سجلات الويب: {log_file}")
findings = {
'suspicious_requests': [],
'failed_requests': [],
'attack_patterns': [],
'statistics': defaultdict(int)
}
attack_patterns = {
'sql_injection': r"('|%27|--|union|select|insert|update|delete|drop)",
'xss': r"(