Advanced Topics

For experienced users: transfer learning, custom log formats, performance optimization, and extending the system.

Transfer Learning: Reusing Pre-Trained Models

Transfer learning lets you leverage models trained on large datasets and adapt them to your specific logs. This is useful when:

  • You have limited log data
  • You want to get started quickly without long training
  • You're monitoring similar systems

How Transfer Learning Works

The text autoencoder learns general language patterns in logs. For a new system:

  1. Start with pre-trained text encoder (trained on large log dataset)
  2. Fine-tune on your specific logs (fewer epochs)
  3. Train new anomaly detector on your data

This combines general knowledge (from pre-training) with specific patterns (from your data).

Implementing Transfer Learning


# Load pre-trained text encoder
pre_trained_encoder = tf.keras.models.load_model(
    "pretrained_models/text_encoder_hdfs.h5"
)

# Freeze early layers (keep pre-trained weights)
for layer in pre_trained_encoder.layers[:-2]:
    layer.trainable = False

# Fine-tune on your data with small learning rate
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
pre_trained_encoder.compile(
    optimizer=optimizer,
    loss='mse'
)

# Train with few epochs
pre_trained_encoder.fit(
    your_data,
    epochs=3,  # Much fewer than training from scratch (10)
    batch_size=64,
    validation_split=0.2
)

# Save fine-tuned model
pre_trained_encoder.save("your_domain_encoder.h5")
        

Supporting Custom Log Formats

DeepSentry is built for Utah format logs, but your logs might have different formats. Here's how to adapt:

Custom Log Parser

Create a custom parser for your log format:


# src/custom_log_parser.py

class CustomLogParser:
    def parse_message(self, line):
        """
        Parse custom format and extract timestamp and message.
        Returns: (timestamp, message)
        """
        # Example: Parse JSON logs
        import json
        data = json.loads(line)
        timestamp = data['time']
        message = data['message']
        return timestamp, message
    
    def parse_file(self, file_handle):
        """Iterate over (timestamp, message) tuples"""
        for line in file_handle:
            if not line.strip():
                continue
            yield self.parse_message(line)

# Usage in prepare.py:
parser = CustomLogParser()
with open("logs.json") as f:
    for timestamp, message in parser.parse_file(f):
        # Process message
        prepared_data.append(message)
        

Integrate with Pipeline

Modify src/tx/prepare.py to use your custom parser instead of UtahLogDatasetParseTools.

Performance Optimization

Batch Processing Optimization

For very large datasets, optimize batch processing:


# Instead of loading entire dataset:
# ❌ data = load_all_data()  # Memory intensive

# Use generator for streaming:
def data_generator(file_path, batch_size=32):
    """Stream data from disk instead of loading all at once"""
    buffer = []
    with open(file_path) as f:
        for line in f:
            buffer.append(line.strip())
            if len(buffer) >= batch_size:
                yield np.array(buffer)
                buffer = []
        if buffer:
            yield np.array(buffer)

# Train with generator:
model.fit(
    data_generator("logs.txt", batch_size=32),
    steps_per_epoch=1000,
    epochs=10
)
        

Model Quantization

Reduce model size for faster inference and lower memory:


import tensorflow as tf

# Load full precision model
model = tf.keras.models.load_model("encoder.h5")

# Convert to quantized (int8) format
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_model = converter.convert()

# Save smaller model
with open("encoder_quantized.tflite", "wb") as f:
    f.write(quantized_model)

# Inference is now faster and uses less memory
interpreter = tf.lite.Interpreter("encoder_quantized.tflite")
        

Caching Encoded Vectors

Cache encoded vectors to avoid re-encoding identical messages:


from functools import lru_cache

class CachedEncoder:
    def __init__(self, model, cache_size=10000):
        self.model = model
        self.cache = {}
        self.cache_size = cache_size
    
    def encode(self, message):
        # Check cache first
        if message in self.cache:
            return self.cache[message]
        
        # Encode if not cached
        vector = self.model.predict(message)
        
        # Store in cache (with size limit)
        if len(self.cache) < self.cache_size:
            self.cache[message] = vector
        
        return vector
        

Ensemble Methods: Combining Multiple Models

Improve reliability by combining predictions from multiple models:


class EnsembleAnomalyDetector:
    def __init__(self, model_paths):
        """Load multiple trained anomaly detectors"""
        self.models = [
            tf.keras.models.load_model(path)
            for path in model_paths
        ]
    
    def score(self, sequence):
        """Average predictions from all models"""
        scores = []
        for model in self.models:
            score = np.mean(np.abs(model.predict(sequence) - sequence))
            scores.append(score)
        
        # Return average score (consensus)
        return np.mean(scores)
    
    def score_with_confidence(self, sequence):
        """Also return confidence/agreement among models"""
        scores = []
        for model in self.models:
            score = np.mean(np.abs(model.predict(sequence) - sequence))
            scores.append(score)
        
        avg_score = np.mean(scores)
        std_score = np.std(scores)  # How much models disagree
        confidence = 1.0 - (std_score / (avg_score + 1e-6))  # High confidence = high agreement
        
        return avg_score, confidence
        

Temporal Analysis: Seasonal Patterns

Some systems have predictable hourly, daily, or weekly patterns. Handle seasonality:


class SeasonalAnomalyDetector:
    def __init__(self, base_detector, season_hours=24):
        """Maintain separate baselines for different times"""
        self.base_detector = base_detector
        self.season_hours = season_hours
        self.seasonal_thresholds = {}
    
    def get_threshold(self, timestamp):
        """Get threshold appropriate for time of day"""
        hour = timestamp.hour
        season_bucket = hour // (24 // self.season_hours)
        
        # Use season-specific threshold if available
        if season_bucket in self.seasonal_thresholds:
            return self.seasonal_thresholds[season_bucket]
        
        # Otherwise use default
        return 2.5
    
    def score_with_season(self, sequence, timestamp):
        """Score considering the time of day"""
        base_score = self.base_detector.score(sequence)
        threshold = self.get_threshold(timestamp)
        
        # Adjust anomaly determination by season
        is_anomaly = base_score > threshold
        
        return base_score, is_anomaly, threshold
        

Integration with External Systems

Export to Prometheus Metrics


from prometheus_client import Counter, Gauge

# Define metrics
anomalies_detected = Counter(
    'deepsentry_anomalies_total',
    'Total anomalies detected',
    ['system', 'level']
)

anomaly_score = Gauge(
    'deepsentry_anomaly_score',
    'Current anomaly score'
)

# In live monitoring:
if score > threshold:
    anomalies_detected.labels(
        system='hdfs',
        level='critical'
    ).inc()

anomaly_score.set(score)
        

Stream to Elasticsearch


from elasticsearch import Elasticsearch

class ElasticsearchWriter:
    def __init__(self, host='localhost', port=9200):
        self.es = Elasticsearch([{'host': host, 'port': port}])
    
    def write_anomaly(self, timestamp, message, score):
        doc = {
            '@timestamp': timestamp,
            'message': message,
            'anomaly_score': score,
            'is_anomaly': score > 2.5
        }
        self.es.index(index='deepsentry', doc_type='_doc', body=doc)
        

Monitoring Model Drift

Models trained on old data may not work well as systems evolve. Detect when retraining is needed:


class DriftDetector:
    def __init__(self, baseline_mean, baseline_std):
        self.baseline_mean = baseline_mean
        self.baseline_std = baseline_std
        self.recent_scores = []
    
    def update(self, score):
        self.recent_scores.append(score)
        if len(self.recent_scores) > 1000:
            self.recent_scores.pop(0)
    
    def detect_drift(self):
        """Check if recent scores differ significantly from baseline"""
        if len(self.recent_scores) < 100:
            return False, 0.0
        
        recent_mean = np.mean(self.recent_scores)
        recent_std = np.std(self.recent_scores)
        
        # Z-score of recent mean vs baseline
        z_score = abs(recent_mean - self.baseline_mean) / self.baseline_std
        
        drift_detected = z_score > 3.0  # 3 sigma threshold
        drift_magnitude = z_score
        
        return drift_detected, drift_magnitude