Advanced Topics
Transfer Learning: Reusing Pre-Trained Models
Transfer learning lets you leverage models trained on large datasets and adapt them to your specific logs. This is useful when:
- You have limited log data
- You want to get started quickly without long training
- You're monitoring similar systems
How Transfer Learning Works
The text autoencoder learns general language patterns in logs. For a new system:
- Start with pre-trained text encoder (trained on large log dataset)
- Fine-tune on your specific logs (fewer epochs)
- Train new anomaly detector on your data
This combines general knowledge (from pre-training) with specific patterns (from your data).
Implementing Transfer Learning
# Load pre-trained text encoder
pre_trained_encoder = tf.keras.models.load_model(
"pretrained_models/text_encoder_hdfs.h5"
)
# Freeze early layers (keep pre-trained weights)
for layer in pre_trained_encoder.layers[:-2]:
layer.trainable = False
# Fine-tune on your data with small learning rate
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
pre_trained_encoder.compile(
optimizer=optimizer,
loss='mse'
)
# Train with few epochs
pre_trained_encoder.fit(
your_data,
epochs=3, # Much fewer than training from scratch (10)
batch_size=64,
validation_split=0.2
)
# Save fine-tuned model
pre_trained_encoder.save("your_domain_encoder.h5")
Supporting Custom Log Formats
DeepSentry is built for Utah format logs, but your logs might have different formats. Here's how to adapt:
Custom Log Parser
Create a custom parser for your log format:
# src/custom_log_parser.py
class CustomLogParser:
def parse_message(self, line):
"""
Parse custom format and extract timestamp and message.
Returns: (timestamp, message)
"""
# Example: Parse JSON logs
import json
data = json.loads(line)
timestamp = data['time']
message = data['message']
return timestamp, message
def parse_file(self, file_handle):
"""Iterate over (timestamp, message) tuples"""
for line in file_handle:
if not line.strip():
continue
yield self.parse_message(line)
# Usage in prepare.py:
parser = CustomLogParser()
with open("logs.json") as f:
for timestamp, message in parser.parse_file(f):
# Process message
prepared_data.append(message)
Integrate with Pipeline
Modify src/tx/prepare.py to use your custom parser instead of UtahLogDatasetParseTools.
Performance Optimization
Batch Processing Optimization
For very large datasets, optimize batch processing:
# Instead of loading entire dataset:
# ❌ data = load_all_data() # Memory intensive
# Use generator for streaming:
def data_generator(file_path, batch_size=32):
"""Stream data from disk instead of loading all at once"""
buffer = []
with open(file_path) as f:
for line in f:
buffer.append(line.strip())
if len(buffer) >= batch_size:
yield np.array(buffer)
buffer = []
if buffer:
yield np.array(buffer)
# Train with generator:
model.fit(
data_generator("logs.txt", batch_size=32),
steps_per_epoch=1000,
epochs=10
)
Model Quantization
Reduce model size for faster inference and lower memory:
import tensorflow as tf
# Load full precision model
model = tf.keras.models.load_model("encoder.h5")
# Convert to quantized (int8) format
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_model = converter.convert()
# Save smaller model
with open("encoder_quantized.tflite", "wb") as f:
f.write(quantized_model)
# Inference is now faster and uses less memory
interpreter = tf.lite.Interpreter("encoder_quantized.tflite")
Caching Encoded Vectors
Cache encoded vectors to avoid re-encoding identical messages:
from functools import lru_cache
class CachedEncoder:
def __init__(self, model, cache_size=10000):
self.model = model
self.cache = {}
self.cache_size = cache_size
def encode(self, message):
# Check cache first
if message in self.cache:
return self.cache[message]
# Encode if not cached
vector = self.model.predict(message)
# Store in cache (with size limit)
if len(self.cache) < self.cache_size:
self.cache[message] = vector
return vector
Ensemble Methods: Combining Multiple Models
Improve reliability by combining predictions from multiple models:
class EnsembleAnomalyDetector:
def __init__(self, model_paths):
"""Load multiple trained anomaly detectors"""
self.models = [
tf.keras.models.load_model(path)
for path in model_paths
]
def score(self, sequence):
"""Average predictions from all models"""
scores = []
for model in self.models:
score = np.mean(np.abs(model.predict(sequence) - sequence))
scores.append(score)
# Return average score (consensus)
return np.mean(scores)
def score_with_confidence(self, sequence):
"""Also return confidence/agreement among models"""
scores = []
for model in self.models:
score = np.mean(np.abs(model.predict(sequence) - sequence))
scores.append(score)
avg_score = np.mean(scores)
std_score = np.std(scores) # How much models disagree
confidence = 1.0 - (std_score / (avg_score + 1e-6)) # High confidence = high agreement
return avg_score, confidence
Temporal Analysis: Seasonal Patterns
Some systems have predictable hourly, daily, or weekly patterns. Handle seasonality:
class SeasonalAnomalyDetector:
def __init__(self, base_detector, season_hours=24):
"""Maintain separate baselines for different times"""
self.base_detector = base_detector
self.season_hours = season_hours
self.seasonal_thresholds = {}
def get_threshold(self, timestamp):
"""Get threshold appropriate for time of day"""
hour = timestamp.hour
season_bucket = hour // (24 // self.season_hours)
# Use season-specific threshold if available
if season_bucket in self.seasonal_thresholds:
return self.seasonal_thresholds[season_bucket]
# Otherwise use default
return 2.5
def score_with_season(self, sequence, timestamp):
"""Score considering the time of day"""
base_score = self.base_detector.score(sequence)
threshold = self.get_threshold(timestamp)
# Adjust anomaly determination by season
is_anomaly = base_score > threshold
return base_score, is_anomaly, threshold
Integration with External Systems
Export to Prometheus Metrics
from prometheus_client import Counter, Gauge
# Define metrics
anomalies_detected = Counter(
'deepsentry_anomalies_total',
'Total anomalies detected',
['system', 'level']
)
anomaly_score = Gauge(
'deepsentry_anomaly_score',
'Current anomaly score'
)
# In live monitoring:
if score > threshold:
anomalies_detected.labels(
system='hdfs',
level='critical'
).inc()
anomaly_score.set(score)
Stream to Elasticsearch
from elasticsearch import Elasticsearch
class ElasticsearchWriter:
def __init__(self, host='localhost', port=9200):
self.es = Elasticsearch([{'host': host, 'port': port}])
def write_anomaly(self, timestamp, message, score):
doc = {
'@timestamp': timestamp,
'message': message,
'anomaly_score': score,
'is_anomaly': score > 2.5
}
self.es.index(index='deepsentry', doc_type='_doc', body=doc)
Monitoring Model Drift
Models trained on old data may not work well as systems evolve. Detect when retraining is needed:
class DriftDetector:
def __init__(self, baseline_mean, baseline_std):
self.baseline_mean = baseline_mean
self.baseline_std = baseline_std
self.recent_scores = []
def update(self, score):
self.recent_scores.append(score)
if len(self.recent_scores) > 1000:
self.recent_scores.pop(0)
def detect_drift(self):
"""Check if recent scores differ significantly from baseline"""
if len(self.recent_scores) < 100:
return False, 0.0
recent_mean = np.mean(self.recent_scores)
recent_std = np.std(self.recent_scores)
# Z-score of recent mean vs baseline
z_score = abs(recent_mean - self.baseline_mean) / self.baseline_std
drift_detected = z_score > 3.0 # 3 sigma threshold
drift_magnitude = z_score
return drift_detected, drift_magnitude