Skip to main content

Advanced Integration Patterns

Once you’ve mastered the basics of ATP integration, these advanced patterns can help you build more robust, maintainable, and efficient implementations.

The Decorator Pattern

The decorator pattern provides a clean way to separate ATP functionality from your core business logic.

Python Decorator Example

import functools
from typing import Callable, Dict, Any

def notify_on_completion(notification_type: str, title_template: str):
    """
    Decorator that sends an ATP notification after the function completes.
    
    Args:
        notification_type: The type of notification to send
        title_template: A template string for the notification title
    """
    def decorator(func: Callable):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Execute the original function
            result = func(*args, **kwargs)
            
            # Extract context for the notification
            context = {
                "function_name": func.__name__,
                "args": args,
                "kwargs": kwargs,
                "result": result
            }
            
            # Create notification title from template
            title = title_template.format(**context)
            
            # Send ATP notification
            send_atp_notification(
                notification_type=notification_type,
                title=title,
                context=context
            )
            
            return result
        return wrapper
    return decorator

# Usage example
@notify_on_completion(
    notification_type="task_completed",
    title_template="Task {function_name} completed successfully"
)
def process_order(order_id: str, user_id: str):
    # Process the order...
    return {"status": "completed", "order_id": order_id}

TypeScript Decorator Example

import { ATPClient } from '@atp/client';

// Notification decorator factory
function NotifyOnCompletion(notificationType: string, titleTemplate: string) {
  return function (
    target: any,
    propertyKey: string,
    descriptor: PropertyDescriptor
  ) {
    const originalMethod = descriptor.value;
    
    descriptor.value = async function (...args: any[]) {
      // Call the original method
      const result = await originalMethod.apply(this, args);
      
      // Extract context for the notification
      const context = {
        functionName: propertyKey,
        args,
        result
      };
      
      // Create notification title from template
      const title = titleTemplate.replace(/{(\w+)}/g, (_, key) => 
        context[key] !== undefined ? context[key] : '{' + key + '}'
      );
      
      // Send ATP notification
      const atpClient = new ATPClient();
      await atpClient.sendNotification({
        type: notificationType,
        title,
        context
      });
      
      return result;
    };
    
    return descriptor;
  };
}

// Usage example
class OrderService {
  @NotifyOnCompletion('task_completed', 'Order {args[0]} processed successfully')
  async processOrder(orderId: string, userId: string) {
    // Process the order...
    return { status: 'completed', orderId };
  }
}

Benefits of the Decorator Approach

  1. Separation of Concerns: Keeps notification logic separate from core business logic
  2. Reusability: Apply the same notification patterns across different functions
  3. Configurability: Customize notifications without changing the underlying code
  4. Testability: Test your core logic independently from ATP integrations

Polling vs. Webhooks

ATP supports both polling and webhook mechanisms for receiving responses. Here’s how to implement each efficiently:

Webhook Implementation

# Webhook handler with advanced features
from flask import Flask, request, jsonify
import hmac
import hashlib
import json
import redis

app = Flask(__name__)
redis_client = redis.Redis()

@app.route('/atp/webhook', methods=['POST'])
def handle_atp_webhook():
    # Get the webhook payload
    payload = request.json
    signature = request.headers.get('X-ATP-Signature')
    
    # Verify the signature
    if not verify_signature(payload, signature):
        return jsonify({"error": "Invalid signature"}), 401
    
    # Check for duplicate webhook (idempotency)
    webhook_id = payload.get('webhook_id')
    if webhook_id and redis_client.exists(f"processed_webhook:{webhook_id}"):
        return jsonify({"status": "already processed"}), 200
    
    # Process the webhook
    try:
        notification_id = payload['notification_id']
        action_id = payload['action_id']
        response_data = payload['response_data']
        
        process_response(notification_id, action_id, response_data)
        
        # Mark webhook as processed
        if webhook_id:
            redis_client.setex(f"processed_webhook:{webhook_id}", 86400, "1")
        
        return jsonify({"status": "success"}), 200
    except Exception as e:
        # Log the error but return 200 to prevent retries if we've seen it
        if webhook_id:
            redis_client.setex(f"processed_webhook:{webhook_id}", 86400, "error")
        return jsonify({"status": "error", "message": str(e)}), 500

Polling Implementation

import { ATPClient } from '@atp/client';

class ResponsePoller {
  private atpClient: ATPClient;
  private pollingInterval: number; // milliseconds
  private isPolling: boolean = false;
  private intervalId?: NodeJS.Timeout;
  private notificationIds: string[] = [];
  
  constructor(atpClient: ATPClient, pollingInterval: number = 60000) {
    this.atpClient = atpClient;
    this.pollingInterval = pollingInterval;
  }
  
  // Add a notification to poll for
  public addNotification(notificationId: string): void {
    if (!this.notificationIds.includes(notificationId)) {
      this.notificationIds.push(notificationId);
      
      // Start polling if not already started
      if (!this.isPolling) {
        this.startPolling();
      }
    }
  }
  
  // Remove a notification from polling
  public removeNotification(notificationId: string): void {
    this.notificationIds = this.notificationIds.filter(id => id !== notificationId);
    
    // Stop polling if no more notifications to poll
    if (this.notificationIds.length === 0 && this.isPolling) {
      this.stopPolling();
    }
  }
  
  // Start the polling process
  private startPolling(): void {
    if (this.isPolling) return;
    
    this.isPolling = true;
    this.poll(); // Poll immediately
    this.intervalId = setInterval(() => this.poll(), this.pollingInterval);
  }
  
  // Stop the polling process
  private stopPolling(): void {
    if (!this.isPolling) return;
    
    this.isPolling = false;
    if (this.intervalId) {
      clearInterval(this.intervalId);
      this.intervalId = undefined;
    }
  }
  
  // Poll for responses
  private async poll(): Promise<void> {
    if (this.notificationIds.length === 0) return;
    
    try {
      // Request responses for multiple notifications in batch
      const responses = await this.atpClient.getResponses(this.notificationIds);
      
      // Process each response
      for (const response of responses) {
        // Process based on status
        if (response.status === 'responded') {
          this.processResponse(response);
          this.removeNotification(response.notification_id);
        } else if (response.status === 'expired') {
          this.handleExpiredNotification(response.notification_id);
          this.removeNotification(response.notification_id);
        }
      }
    } catch (error) {
      console.error('Polling error:', error);
      // Implement exponential backoff or other error handling
    }
  }
  
  private processResponse(response: any): void {
    // Process the response based on your business logic
    const { notification_id, action_id, response_data } = response;
    
    // Handle the response...
    console.log(`Processed response for notification ${notification_id}`);
    
    // Notify your application of the response
    // e.g., emit an event, call a callback, etc.
  }
  
  private handleExpiredNotification(notificationId: string): void {
    // Handle expired notifications
    console.log(`Notification ${notificationId} expired without response`);
    
    // Notify your application that the notification expired
  }
}

// Usage
const atpClient = new ATPClient({/* config */});
const poller = new ResponsePoller(atpClient, 30000); // Poll every 30 seconds

// When sending a notification
const notificationId = await atpClient.sendNotification({/* notification */});
poller.addNotification(notificationId);

Choosing Between Polling and Webhooks

ApproachAdvantagesDisadvantagesBest For
Webhooks- Real-time updates
- Lower latency
- Reduced server load
- Requires public endpoint
- More complex setup
- Needs retry handling
Services with public endpoints that need immediate responses
Polling- Works behind firewalls
- Simpler implementation
- More control over timing
- Higher latency
- More API calls
- Potential rate limiting
Development environments, internal services, or when webhooks aren’t feasible

Timeout Handling

Implement robust timeout mechanisms to handle cases where responses don’t arrive within expected timeframes.

Deadline-Based Processing

import time
from datetime import datetime, timedelta

class NotificationManager:
    def __init__(self, atp_client):
        self.atp_client = atp_client
        self.pending_notifications = {}  # notification_id -> expiry_time
        
    def send_notification(self, notification_data, timeout_seconds=3600):
        # Send the notification
        response = self.atp_client.send_notification(notification_data)
        notification_id = response['notification_id']
        
        # Set expiry time
        expiry_time = datetime.now() + timedelta(seconds=timeout_seconds)
        
        # Store in pending notifications
        self.pending_notifications[notification_id] = {
            'expiry_time': expiry_time,
            'data': notification_data,
            'timeout_seconds': timeout_seconds
        }
        
        return notification_id
    
    def process_response(self, notification_id, action_id, response_data):
        # Process the response
        # ...
        
        # Remove from pending
        if notification_id in self.pending_notifications:
            del self.pending_notifications[notification_id]
    
    def check_timeouts(self):
        """Check for timed-out notifications and handle them"""
        now = datetime.now()
        timed_out = []
        
        # Find expired notifications
        for notification_id, info in self.pending_notifications.items():
            if now > info['expiry_time']:
                timed_out.append((notification_id, info))
        
        # Process timeouts
        for notification_id, info in timed_out:
            self.handle_timeout(notification_id, info)
            del self.pending_notifications[notification_id]
    
    def handle_timeout(self, notification_id, info):
        """Handle a notification timeout"""
        # Log the timeout
        print(f"Notification {notification_id} timed out after {info['timeout_seconds']} seconds")
        
        # Update the notification status on ATP server
        self.atp_client.update_notification_status(
            notification_id,
            status="cancelled",
            reason="Response timeout exceeded"
        )
        
        # Execute fallback logic
        # e.g., make a default decision, notify admin, etc.
        self.execute_timeout_fallback(notification_id, info['data'])
    
    def execute_timeout_fallback(self, notification_id, original_data):
        """Execute fallback logic for timed out notification"""
        # Implement your timeout fallback strategy
        # For example:
        # - Apply a default decision
        # - Escalate to a human
        # - Retry with a different approach
        pass
        
    def start_timeout_checker(self, check_interval=60):
        """Start a background thread that checks for timeouts"""
        import threading
        
        def check_loop():
            while True:
                try:
                    self.check_timeouts()
                except Exception as e:
                    print(f"Error in timeout checker: {e}")
                time.sleep(check_interval)
        
        thread = threading.Thread(target=check_loop, daemon=True)
        thread.start()
        return thread

Timeout Strategies

StrategyImplementationUse Cases
Default ActionAfter timeout, apply a conservative default decisionApproval workflows where denial is the safe default
EscalationRoute to human supervisor or higher-priority notificationCritical decisions requiring human judgment
RetrySend a follow-up notification with urgency markerWhen user response is essential
Graceful DegradationProceed with limited functionalityWhen system can function with reduced capabilities
Transaction AbortionCancel the entire operationWhen partial completion would cause inconsistency

Batching Techniques

Optimize performance by batching notifications and processing responses in groups.

Notification Batching

// BatchManager for ATP notifications
class ATPBatchManager {
  constructor(atpClient, options = {}) {
    this.atpClient = atpClient;
    this.batchSize = options.batchSize || 10;
    this.flushInterval = options.flushInterval || 5000; // 5 seconds
    this.queue = [];
    this.sendPromises = new Map();
    this.lastFlushed = Date.now();
    
    // Set up periodic flushing
    this.intervalId = setInterval(() => this.flushIfNeeded(), 1000);
  }
  
  async addNotification(notification) {
    return new Promise((resolve, reject) => {
      // Add to queue with resolve/reject handlers
      this.queue.push({
        notification,
        resolve,
        reject
      });
      
      // Flush immediately if batch size reached
      if (this.queue.length >= this.batchSize) {
        this.flush();
      }
    });
  }
  
  flushIfNeeded() {
    const now = Date.now();
    // Flush if interval elapsed and queue has items
    if (this.queue.length > 0 && now - this.lastFlushed >= this.flushInterval) {
      this.flush();
    }
  }
  
  async flush() {
    if (this.queue.length === 0) return;
    
    const batch = this.queue.splice(0, this.batchSize);
    this.lastFlushed = Date.now();
    
    try {
      // Create batch request
      const notifications = batch.map(item => item.notification);
      const results = await this.atpClient.sendBatchNotifications(notifications);
      
      // Resolve promises with corresponding results
      results.forEach((result, index) => {
        if (result.success) {
          batch[index].resolve(result);
        } else {
          batch[index].reject(new Error(result.error || 'Failed to send notification'));
        }
      });
    } catch (error) {
      // If batch request fails, reject all promises
      batch.forEach(item => item.reject(error));
    }
  }
  
  destroy() {
    // Clean up interval on shutdown
    if (this.intervalId) {
      clearInterval(this.intervalId);
    }
    
    // Flush any remaining items
    if (this.queue.length > 0) {
      this.flush();
    }
  }
}

// Usage
const batchManager = new ATPBatchManager(atpClient, {
  batchSize: 20,
  flushInterval: 3000
});

// Send notifications through the batch manager
async function sendUserNotification(userId, message) {
  try {
    const result = await batchManager.addNotification({
      user_id: userId,
      type: 'message',
      title: 'New Message',
      body: message
    });
    
    return result.notification_id;
  } catch (error) {
    console.error('Failed to send notification:', error);
    throw error;
  }
}

Response Batching

class ResponseBatchProcessor:
    def __init__(self, max_batch_size=100):
        self.response_queue = []
        self.max_batch_size = max_batch_size
        self.processing = False
    
    def add_response(self, notification_id, action_id, response_data):
        """Add a response to the processing queue"""
        self.response_queue.append({
            'notification_id': notification_id,
            'action_id': action_id,
            'response_data': response_data
        })
        
        # Process batch if we've reached max size
        if len(self.response_queue) >= self.max_batch_size:
            self.process_batch()
    
    def process_batch(self):
        """Process a batch of responses"""
        if self.processing or not self.response_queue:
            return
            
        self.processing = True
        
        try:
            # Get a batch of responses
            batch = self.response_queue[:self.max_batch_size]
            self.response_queue = self.response_queue[self.max_batch_size:]
            
            # Group by action type for efficient processing
            action_groups = {}
            for response in batch:
                action_id = response['action_id']
                if action_id not in action_groups:
                    action_groups[action_id] = []
                action_groups[action_id].append(response)
            
            # Process each action group
            for action_id, responses in action_groups.items():
                self.process_action_group(action_id, responses)
                
        finally:
            self.processing = False
            
            # If more items in queue, process next batch
            if self.response_queue:
                self.process_batch()
    
    def process_action_group(self, action_id, responses):
        """Process a group of responses with the same action"""
        # Example: Bulk database operations for the same action type
        if action_id == "approve":
            # Bulk approve all items
            notification_ids = [r['notification_id'] for r in responses]
            bulk_approve_items(notification_ids)
            
        elif action_id == "form_submit":
            # Bulk process form submissions
            form_data_list = [
                {'notification_id': r['notification_id'], 'data': r['response_data']}
                for r in responses
            ]
            bulk_process_form_submissions(form_data_list)
        
        # Update all notification statuses in one call
        update_notification_statuses(
            [(r['notification_id'], 'processed') for r in responses]
        )

# Helper functions for bulk operations
def bulk_approve_items(notification_ids):
    """Approve multiple items in a single database operation"""
    # Example using SQLAlchemy
    db.session.execute(
        """
        UPDATE items
        SET status = 'approved', updated_at = now()
        WHERE notification_id IN :notification_ids
        """,
        {'notification_ids': tuple(notification_ids)}
    )
    db.session.commit()

def bulk_process_form_submissions(form_data_list):
    """Process multiple form submissions efficiently"""
    # Prepare data for bulk insert
    values = [
        {
            'notification_id': item['notification_id'],
            'form_data': json.dumps(item['data']),
            'processed_at': datetime.now()
        }
        for item in form_data_list
    ]
    
    # Bulk insert
    db.session.execute(
        """
        INSERT INTO form_submissions (notification_id, form_data, processed_at)
        VALUES (:notification_id, :form_data, :processed_at)
        """,
        values
    )
    db.session.commit()

Response Verification Patterns

Implement robust verification for all incoming responses with these patterns.

Multi-Factor Verification

class ResponseVerifier:
    def __init__(self, atp_client, webhook_secret):
        self.atp_client = atp_client
        self.webhook_secret = webhook_secret
    
    def verify_response(self, payload, signature, request_headers):
        """
        Comprehensive verification of an ATP response
        """
        # Step 1: Verify signature (cryptographic)
        if not self._verify_signature(payload, signature):
            raise SecurityException("Invalid signature")
            
        # Step 2: Verify timestamp (temporal)
        if not self._verify_timestamp(payload.get('timestamp')):
            raise SecurityException("Response too old or from the future")
            
        # Step 3: Verify notification ownership (application)
        notification_id = payload.get('notification_id')
        if not self._verify_notification_ownership(notification_id):
            raise SecurityException("Unknown notification ID")
            
        # Step 4: Verify IP address (network)
        client_ip = request_headers.get('X-Forwarded-For', '').split(',')[0].strip()
        if not self._verify_ip_address(client_ip):
            raise SecurityException("Unauthorized IP address")
            
        # Step 5: Verify rate limits (behavioral)
        if not self._verify_rate_limits(notification_id):
            raise SecurityException("Rate limit exceeded")
            
        # All verifications passed
        return True
    
    def _verify_signature(self, payload, signature):
        """Verify HMAC signature"""
        expected_signature = hmac.new(
            self.webhook_secret.encode(),
            json.dumps(payload, sort_keys=True).encode(),
            hashlib.sha256
        ).hexdigest()
        
        return hmac.compare_digest(expected_signature, signature)
    
    def _verify_timestamp(self, timestamp_str, max_age_seconds=300):
        """Verify timestamp is recent"""
        if not timestamp_str:
            return False
            
        try:
            # Parse ISO8601 timestamp
            timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
            
            # Check if timestamp is in acceptable range
            now = datetime.now(timezone.utc)
            time_diff = abs((now - timestamp).total_seconds())
            
            return time_diff <= max_age_seconds
        except (ValueError, TypeError):
            return False
    
    def _verify_notification_ownership(self, notification_id):
        """Verify this notification was sent by this service"""
        # Check against database of sent notifications
        result = db.execute(
            "SELECT COUNT(*) FROM sent_notifications WHERE id = ?",
            (notification_id,)
        )
        return result.fetchone()[0] > 0
    
    def _verify_ip_address(self, client_ip):
        """Verify IP is from trusted ATP servers"""
        # Check against allowlist of ATP server IPs
        trusted_ips = [
            '192.0.2.1',  # Example ATP server IPs
            '192.0.2.2',
            # ...
        ]
        return client_ip in trusted_ips
    
    def _verify_rate_limits(self, notification_id):
        """Verify not exceeding rate limits for responses"""
        # Check recent responses count in Redis
        key = f"response_count:{notification_id}"
        count = redis_client.get(key)
        
        if count and int(count) > 5:  # Example: max 5 responses per notification
            return False
            
        # Increment counter with expiry
        redis_client.incr(key)
        redis_client.expire(key, 3600)  # 1 hour expiry
        
        return True

Dual Verification with ATP Server

import { ATPClient } from '@atp/client';

class DualVerificationHandler {
  private atpClient: ATPClient;
  private webhookSecret: string;
  
  constructor(atpClient: ATPClient, webhookSecret: string) {
    this.atpClient = atpClient;
    this.webhookSecret = webhookSecret;
  }
  
  async verifyAndProcessWebhook(payload: any, signature: string): Promise<boolean> {
    // Step 1: Local verification
    if (!this.verifySignature(payload, signature)) {
      throw new Error('Invalid signature');
    }
    
    const { notification_id, action_id, response_data } = payload;
    
    // Step 2: Secondary verification with ATP server
    try {
      // Verify response is legitimate by querying the ATP server directly
      const verificationResult = await this.atpClient.verifyResponse({
        notification_id,
        action_id,
        verification_token: payload.verification_token
      });
      
      if (!verificationResult.valid) {
        throw new Error('Response verification failed');
      }
      
      // Process the verified response
      await this.processVerifiedResponse(notification_id, action_id, response_data);
      return true;
    } catch (error) {
      console.error('Verification failed:', error);
      return false;
    }
  }
  
  private verifySignature(payload: any, signature: string): boolean {
    // Implementation of signature verification
    // ...
    return true; // Placeholder
  }
  
  private async processVerifiedResponse(
    notificationId: string,
    actionId: string,
    responseData: any
  ): Promise<void> {
    // Process the verified response
    // ...
  }
}

Reliable Transaction Processing

Ensure response processing is reliable with these transactional patterns.

Response Processing Saga

import java.util.UUID;

public class ResponseProcessor {
    private final Database db;
    private final ATPClient atpClient;
    
    public ResponseProcessor(Database db, ATPClient atpClient) {
        this.db = db;
        this.atpClient = atpClient;
    }
    
    public void processResponse(String notificationId, String actionId, Map<String, Object> responseData) {
        // Create a unique processing ID for tracking
        String processingId = UUID.randomUUID().toString();
        
        try {
            // Start a processing record with status
            db.executeUpdate(
                "INSERT INTO response_processing (id, notification_id, status) VALUES (?, ?, ?)",
                processingId, notificationId, "started"
            );
            
            // Step 1: Record the response
            db.executeUpdate(
                "INSERT INTO notification_responses (notification_id, action_id, response_data, received_at) VALUES (?, ?, ?, NOW())",
                notificationId, actionId, JSON.stringify(responseData)
            );
            db.executeUpdate(
                "UPDATE response_processing SET status = ? WHERE id = ?",
                "recorded", processingId
            );
            
            // Step 2: Execute business logic
            boolean success = executeBusinessLogic(notificationId, actionId, responseData);
            db.executeUpdate(
                "UPDATE response_processing SET status = ? WHERE id = ?",
                "business_logic_executed", processingId
            );
            
            // Step 3: Update notification status on ATP server
            atpClient.updateNotificationStatus(notificationId, "processed");
            db.executeUpdate(
                "UPDATE response_processing SET status = ? WHERE id = ?",
                "completed", processingId
            );
            
        } catch (Exception e) {
            // Log the error
            logger.error("Error processing response: " + e.getMessage(), e);
            
            // Update processing status to failed
            try {
                db.executeUpdate(
                    "UPDATE response_processing SET status = ?, error_message = ? WHERE id = ?",
                    "failed", e.getMessage(), processingId
                );
            } catch (Exception ex) {
                logger.error("Failed to update processing status", ex);
            }
            
            // Trigger recovery process if needed
            triggerRecoveryProcess(processingId);
            
            // Throw the exception to notify the webhook handler
            throw e;
        }
    }
    
    private boolean executeBusinessLogic(String notificationId, String actionId, Map<String, Object> responseData) {
        // Implement your business logic based on the response
        // ...
        return true;
    }
    
    private void triggerRecoveryProcess(String processingId) {
        // Schedule a recovery job
        recoveryService.scheduleRecovery(processingId);
    }
}

Distributed Transaction Handling

For complex workflows that span multiple services:
import uuid
from datetime import datetime
import json

class DistributedTransactionManager:
    def __init__(self, db_conn, message_broker, atp_client):
        self.db_conn = db_conn
        self.message_broker = message_broker
        self.atp_client = atp_client
    
    def process_response(self, notification_id, action_id, response_data):
        # Create transaction ID
        transaction_id = str(uuid.uuid4())
        
        # Record transaction start
        self.record_transaction_step(
            transaction_id,
            notification_id,
            "transaction_started"
        )
        
        try:
            # Step 1: Record the response locally
            self.db_conn.execute(
                """
                INSERT INTO notification_responses 
                (notification_id, action_id, response_data, transaction_id, received_at)
                VALUES (%s, %s, %s, %s, %s)
                """,
                (notification_id, action_id, json.dumps(response_data), 
                transaction_id, datetime.now())
            )
            
            self.record_transaction_step(
                transaction_id,
                notification_id,
                "response_recorded"
            )
            
            # Step 2: Publish event to message broker for downstream processing
            self.message_broker.publish(
                "atp.responses",
                {
                    "transaction_id": transaction_id,
                    "notification_id": notification_id,
                    "action_id": action_id,
                    "response_data": response_data,
                    "timestamp": datetime.now().isoformat()
                }
            )
            
            self.record_transaction_step(
                transaction_id,
                notification_id,
                "event_published"
            )
            
            # Step 3: Update the notification status on ATP server
            self.atp_client.update_notification_status(
                notification_id, 
                status="received",
                metadata={"transaction_id": transaction_id}
            )
            
            self.record_transaction_step(
                transaction_id,
                notification_id,
                "atp_status_updated"
            )
            
            # Mark transaction as complete
            self.record_transaction_step(
                transaction_id,
                notification_id,
                "transaction_completed"
            )
            
            return {
                "status": "success",
                "transaction_id": transaction_id
            }
            
        except Exception as e:
            # Record failure
            self.record_transaction_step(
                transaction_id,
                notification_id,
                "transaction_failed",
                error_message=str(e)
            )
            
            # Publish failure event for recovery
            self.message_broker.publish(
                "atp.transaction.failures",
                {
                    "transaction_id": transaction_id,
                    "notification_id": notification_id,
                    "error": str(e),
                    "timestamp": datetime.now().isoformat()
                }
            )
            
            raise
    
    def record_transaction_step(self, transaction_id, notification_id, step, error_message=None):
        """Record a step in the transaction process"""
        self.db_conn.execute(
            """
            INSERT INTO transaction_logs 
            (transaction_id, notification_id, step, error_message, timestamp)
            VALUES (%s, %s, %s, %s, %s)
            """,
            (transaction_id, notification_id, step, error_message, datetime.now())
        )
    
    def recover_transaction(self, transaction_id):
        """Attempt to recover a failed transaction"""
        # Get transaction details
        result = self.db_conn.execute(
            "SELECT * FROM transaction_logs WHERE transaction_id = %s ORDER BY timestamp",
            (transaction_id,)
        )
        steps = result.fetchall()
        
        if not steps:
            raise ValueError(f"No transaction logs found for {transaction_id}")
        
        # Determine last successful step
        last_step = steps[-1]['step']
        notification_id = steps[0]['notification_id']
        
        # Recovery logic based on where the transaction failed
        if last_step == "response_recorded":
            # Failed after recording response, retry publishing
            # ...
            pass
        elif last_step == "event_published":
            # Failed after publishing, retry ATP status update
            # ...
            pass
        elif last_step == "transaction_failed":
            # Complete failure, manual intervention required
            # ...
            pass

Middleware Integration

Create middleware that integrates ATP seamlessly into your existing application architecture.

Express.js Middleware

// ATP middleware for Express.js
const crypto = require('crypto');

function atpMiddleware(options = {}) {
  const webhookSecret = options.webhookSecret;
  const atpClient = options.atpClient;
  const path = options.path || '/atp/webhook';
  
  if (!webhookSecret) {
    throw new Error('ATP webhook secret is required');
  }
  
  if (!atpClient) {
    throw new Error('ATP client is required');
  }
  
  return function(req, res, next) {
    // Only handle ATP webhook requests
    if (req.path !== path || req.method !== 'POST') {
      return next();
    }
    
    // Verify the request is from ATP
    const signature = req.headers['x-atp-signature'];
    const payload = req.body;
    
    if (!verifySignature(payload, signature, webhookSecret)) {
      return res.status(401).json({ error: 'Invalid signature' });
    }
    
    // Add ATP-specific properties to the request
    req.atp = {
      notification_id: payload.notification_id,
      action_id: payload.action_id,
      response_data: payload.response_data,
      timestamp: payload.timestamp,
      client: atpClient,
      
      // Helper method to update notification status
      updateStatus: async (status, metadata) => {
        return atpClient.updateNotificationStatus(
          payload.notification_id,
          status,
          metadata
        );
      }
    };
    
    // Continue processing
    next();
  };
}

// Helper function to verify ATP signature
function verifySignature(payload, signature, secret) {
  if (!payload || !signature || !secret) {
    return false;
  }
  
  const expectedSignature = crypto
    .createHmac('sha256', secret)
    .update(JSON.stringify(payload, Object.keys(payload).sort()))
    .digest('hex');
    
  return crypto.timingSafeEqual(
    Buffer.from(expectedSignature),
    Buffer.from(signature)
  );
}

// Usage
const express = require('express');
const { ATPClient } = require('@atp/client');

const app = express();
app.use(express.json());

const atpClient = new ATPClient({
  serviceId: 'your-service-id',
  apiKey: process.env.ATP_API_KEY
});

// Register ATP middleware
app.use(atpMiddleware({
  webhookSecret: process.env.ATP_WEBHOOK_SECRET,
  atpClient: atpClient,
  path: '/webhooks/atp'
}));

// ATP webhook handler
app.post('/webhooks/atp', (req, res) => {
  const { notification_id, action_id, response_data, updateStatus } = req.atp;
  
  // Process the response
  handleResponse(notification_id, action_id, response_data)
    .then(async () => {
      // Update status
      await updateStatus('processed');
      res.status(200).json({ status: 'success' });
    })
    .catch(err => {
      console.error('Error processing response:', err);
      res.status(500).json({ error: 'Failed to process response' });
    });
});

function handleResponse(notificationId, actionId, responseData) {
  // Your business logic here
  return Promise.resolve();
}

Django Middleware

from django.utils.deprecation import MiddlewareMixin
import hmac
import hashlib
import json

class ATPMiddleware(MiddlewareMixin):
    def __init__(self, get_response):
        super().__init__(get_response)
        from django.conf import settings
        
        self.webhook_path = getattr(settings, 'ATP_WEBHOOK_PATH', '/api/atp/webhook/')
        self.webhook_secret = getattr(settings, 'ATP_WEBHOOK_SECRET', None)
        
        if not self.webhook_secret:
            raise ValueError("ATP_WEBHOOK_SECRET must be set in Django settings")
    
    def process_view(self, request, view_func, view_args, view_kwargs):
        # Only process ATP webhook requests
        if request.path != self.webhook_path or request.method != 'POST':
            return None
            
        # Verify signature
        try:
            signature = request.headers.get('X-ATP-Signature')
            payload = json.loads(request.body)
            
            if not self.verify_signature(payload, signature):
                return JsonResponse({'error': 'Invalid signature'}, status=401)
                
            # Add ATP data to request
            request.atp = {
                'notification_id': payload.get('notification_id'),
                'action_id': payload.get('action_id'),
                'response_data': payload.get('response_data'),
                'raw_payload': payload,
            }
            
        except (json.JSONDecodeError, ValueError) as e:
            return JsonResponse({'error': f'Invalid request: {str(e)}'}, status=400)
            
        return None  # Continue to view
    
    def verify_signature(self, payload, signature):
        if not payload or not signature:
            return False
            
        expected_signature = hmac.new(
            self.webhook_secret.encode(),
            json.dumps(payload, sort_keys=True).encode(),
            hashlib.sha256
        ).hexdigest()
        
        return hmac.compare_digest(expected_signature, signature)

Verification: Knowledge Check

Before implementing these advanced patterns, verify your understanding:

Next Steps

Now that you understand advanced ATP patterns, explore other ATP client guides:

Processing Responses

Learn how to securely handle and process user responses to your ATP notifications

Creating Notifications

Discover how to craft effective notifications that engage users
I