Advanced Integration Patterns
Once you’ve mastered the basics of ATP integration, these advanced patterns can help you build more robust, maintainable, and efficient implementations.The Decorator Pattern
The decorator pattern provides a clean way to separate ATP functionality from your core business logic.Python Decorator Example
Copy
import functools
from typing import Callable, Dict, Any
def notify_on_completion(notification_type: str, title_template: str):
"""
Decorator that sends an ATP notification after the function completes.
Args:
notification_type: The type of notification to send
title_template: A template string for the notification title
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Execute the original function
result = func(*args, **kwargs)
# Extract context for the notification
context = {
"function_name": func.__name__,
"args": args,
"kwargs": kwargs,
"result": result
}
# Create notification title from template
title = title_template.format(**context)
# Send ATP notification
send_atp_notification(
notification_type=notification_type,
title=title,
context=context
)
return result
return wrapper
return decorator
# Usage example
@notify_on_completion(
notification_type="task_completed",
title_template="Task {function_name} completed successfully"
)
def process_order(order_id: str, user_id: str):
# Process the order...
return {"status": "completed", "order_id": order_id}
TypeScript Decorator Example
Copy
import { ATPClient } from '@atp/client';
// Notification decorator factory
function NotifyOnCompletion(notificationType: string, titleTemplate: string) {
return function (
target: any,
propertyKey: string,
descriptor: PropertyDescriptor
) {
const originalMethod = descriptor.value;
descriptor.value = async function (...args: any[]) {
// Call the original method
const result = await originalMethod.apply(this, args);
// Extract context for the notification
const context = {
functionName: propertyKey,
args,
result
};
// Create notification title from template
const title = titleTemplate.replace(/{(\w+)}/g, (_, key) =>
context[key] !== undefined ? context[key] : '{' + key + '}'
);
// Send ATP notification
const atpClient = new ATPClient();
await atpClient.sendNotification({
type: notificationType,
title,
context
});
return result;
};
return descriptor;
};
}
// Usage example
class OrderService {
@NotifyOnCompletion('task_completed', 'Order {args[0]} processed successfully')
async processOrder(orderId: string, userId: string) {
// Process the order...
return { status: 'completed', orderId };
}
}
Benefits of the Decorator Approach
- Separation of Concerns: Keeps notification logic separate from core business logic
- Reusability: Apply the same notification patterns across different functions
- Configurability: Customize notifications without changing the underlying code
- Testability: Test your core logic independently from ATP integrations
Polling vs. Webhooks
ATP supports both polling and webhook mechanisms for receiving responses. Here’s how to implement each efficiently:Webhook Implementation
Copy
# Webhook handler with advanced features
from flask import Flask, request, jsonify
import hmac
import hashlib
import json
import redis
app = Flask(__name__)
redis_client = redis.Redis()
@app.route('/atp/webhook', methods=['POST'])
def handle_atp_webhook():
# Get the webhook payload
payload = request.json
signature = request.headers.get('X-ATP-Signature')
# Verify the signature
if not verify_signature(payload, signature):
return jsonify({"error": "Invalid signature"}), 401
# Check for duplicate webhook (idempotency)
webhook_id = payload.get('webhook_id')
if webhook_id and redis_client.exists(f"processed_webhook:{webhook_id}"):
return jsonify({"status": "already processed"}), 200
# Process the webhook
try:
notification_id = payload['notification_id']
action_id = payload['action_id']
response_data = payload['response_data']
process_response(notification_id, action_id, response_data)
# Mark webhook as processed
if webhook_id:
redis_client.setex(f"processed_webhook:{webhook_id}", 86400, "1")
return jsonify({"status": "success"}), 200
except Exception as e:
# Log the error but return 200 to prevent retries if we've seen it
if webhook_id:
redis_client.setex(f"processed_webhook:{webhook_id}", 86400, "error")
return jsonify({"status": "error", "message": str(e)}), 500
Polling Implementation
Copy
import { ATPClient } from '@atp/client';
class ResponsePoller {
private atpClient: ATPClient;
private pollingInterval: number; // milliseconds
private isPolling: boolean = false;
private intervalId?: NodeJS.Timeout;
private notificationIds: string[] = [];
constructor(atpClient: ATPClient, pollingInterval: number = 60000) {
this.atpClient = atpClient;
this.pollingInterval = pollingInterval;
}
// Add a notification to poll for
public addNotification(notificationId: string): void {
if (!this.notificationIds.includes(notificationId)) {
this.notificationIds.push(notificationId);
// Start polling if not already started
if (!this.isPolling) {
this.startPolling();
}
}
}
// Remove a notification from polling
public removeNotification(notificationId: string): void {
this.notificationIds = this.notificationIds.filter(id => id !== notificationId);
// Stop polling if no more notifications to poll
if (this.notificationIds.length === 0 && this.isPolling) {
this.stopPolling();
}
}
// Start the polling process
private startPolling(): void {
if (this.isPolling) return;
this.isPolling = true;
this.poll(); // Poll immediately
this.intervalId = setInterval(() => this.poll(), this.pollingInterval);
}
// Stop the polling process
private stopPolling(): void {
if (!this.isPolling) return;
this.isPolling = false;
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = undefined;
}
}
// Poll for responses
private async poll(): Promise<void> {
if (this.notificationIds.length === 0) return;
try {
// Request responses for multiple notifications in batch
const responses = await this.atpClient.getResponses(this.notificationIds);
// Process each response
for (const response of responses) {
// Process based on status
if (response.status === 'responded') {
this.processResponse(response);
this.removeNotification(response.notification_id);
} else if (response.status === 'expired') {
this.handleExpiredNotification(response.notification_id);
this.removeNotification(response.notification_id);
}
}
} catch (error) {
console.error('Polling error:', error);
// Implement exponential backoff or other error handling
}
}
private processResponse(response: any): void {
// Process the response based on your business logic
const { notification_id, action_id, response_data } = response;
// Handle the response...
console.log(`Processed response for notification ${notification_id}`);
// Notify your application of the response
// e.g., emit an event, call a callback, etc.
}
private handleExpiredNotification(notificationId: string): void {
// Handle expired notifications
console.log(`Notification ${notificationId} expired without response`);
// Notify your application that the notification expired
}
}
// Usage
const atpClient = new ATPClient({/* config */});
const poller = new ResponsePoller(atpClient, 30000); // Poll every 30 seconds
// When sending a notification
const notificationId = await atpClient.sendNotification({/* notification */});
poller.addNotification(notificationId);
Choosing Between Polling and Webhooks
Approach | Advantages | Disadvantages | Best For |
---|---|---|---|
Webhooks | - Real-time updates - Lower latency - Reduced server load | - Requires public endpoint - More complex setup - Needs retry handling | Services with public endpoints that need immediate responses |
Polling | - Works behind firewalls - Simpler implementation - More control over timing | - Higher latency - More API calls - Potential rate limiting | Development environments, internal services, or when webhooks aren’t feasible |
Timeout Handling
Implement robust timeout mechanisms to handle cases where responses don’t arrive within expected timeframes.Deadline-Based Processing
Copy
import time
from datetime import datetime, timedelta
class NotificationManager:
def __init__(self, atp_client):
self.atp_client = atp_client
self.pending_notifications = {} # notification_id -> expiry_time
def send_notification(self, notification_data, timeout_seconds=3600):
# Send the notification
response = self.atp_client.send_notification(notification_data)
notification_id = response['notification_id']
# Set expiry time
expiry_time = datetime.now() + timedelta(seconds=timeout_seconds)
# Store in pending notifications
self.pending_notifications[notification_id] = {
'expiry_time': expiry_time,
'data': notification_data,
'timeout_seconds': timeout_seconds
}
return notification_id
def process_response(self, notification_id, action_id, response_data):
# Process the response
# ...
# Remove from pending
if notification_id in self.pending_notifications:
del self.pending_notifications[notification_id]
def check_timeouts(self):
"""Check for timed-out notifications and handle them"""
now = datetime.now()
timed_out = []
# Find expired notifications
for notification_id, info in self.pending_notifications.items():
if now > info['expiry_time']:
timed_out.append((notification_id, info))
# Process timeouts
for notification_id, info in timed_out:
self.handle_timeout(notification_id, info)
del self.pending_notifications[notification_id]
def handle_timeout(self, notification_id, info):
"""Handle a notification timeout"""
# Log the timeout
print(f"Notification {notification_id} timed out after {info['timeout_seconds']} seconds")
# Update the notification status on ATP server
self.atp_client.update_notification_status(
notification_id,
status="cancelled",
reason="Response timeout exceeded"
)
# Execute fallback logic
# e.g., make a default decision, notify admin, etc.
self.execute_timeout_fallback(notification_id, info['data'])
def execute_timeout_fallback(self, notification_id, original_data):
"""Execute fallback logic for timed out notification"""
# Implement your timeout fallback strategy
# For example:
# - Apply a default decision
# - Escalate to a human
# - Retry with a different approach
pass
def start_timeout_checker(self, check_interval=60):
"""Start a background thread that checks for timeouts"""
import threading
def check_loop():
while True:
try:
self.check_timeouts()
except Exception as e:
print(f"Error in timeout checker: {e}")
time.sleep(check_interval)
thread = threading.Thread(target=check_loop, daemon=True)
thread.start()
return thread
Timeout Strategies
Strategy | Implementation | Use Cases |
---|---|---|
Default Action | After timeout, apply a conservative default decision | Approval workflows where denial is the safe default |
Escalation | Route to human supervisor or higher-priority notification | Critical decisions requiring human judgment |
Retry | Send a follow-up notification with urgency marker | When user response is essential |
Graceful Degradation | Proceed with limited functionality | When system can function with reduced capabilities |
Transaction Abortion | Cancel the entire operation | When partial completion would cause inconsistency |
Batching Techniques
Optimize performance by batching notifications and processing responses in groups.Notification Batching
Copy
// BatchManager for ATP notifications
class ATPBatchManager {
constructor(atpClient, options = {}) {
this.atpClient = atpClient;
this.batchSize = options.batchSize || 10;
this.flushInterval = options.flushInterval || 5000; // 5 seconds
this.queue = [];
this.sendPromises = new Map();
this.lastFlushed = Date.now();
// Set up periodic flushing
this.intervalId = setInterval(() => this.flushIfNeeded(), 1000);
}
async addNotification(notification) {
return new Promise((resolve, reject) => {
// Add to queue with resolve/reject handlers
this.queue.push({
notification,
resolve,
reject
});
// Flush immediately if batch size reached
if (this.queue.length >= this.batchSize) {
this.flush();
}
});
}
flushIfNeeded() {
const now = Date.now();
// Flush if interval elapsed and queue has items
if (this.queue.length > 0 && now - this.lastFlushed >= this.flushInterval) {
this.flush();
}
}
async flush() {
if (this.queue.length === 0) return;
const batch = this.queue.splice(0, this.batchSize);
this.lastFlushed = Date.now();
try {
// Create batch request
const notifications = batch.map(item => item.notification);
const results = await this.atpClient.sendBatchNotifications(notifications);
// Resolve promises with corresponding results
results.forEach((result, index) => {
if (result.success) {
batch[index].resolve(result);
} else {
batch[index].reject(new Error(result.error || 'Failed to send notification'));
}
});
} catch (error) {
// If batch request fails, reject all promises
batch.forEach(item => item.reject(error));
}
}
destroy() {
// Clean up interval on shutdown
if (this.intervalId) {
clearInterval(this.intervalId);
}
// Flush any remaining items
if (this.queue.length > 0) {
this.flush();
}
}
}
// Usage
const batchManager = new ATPBatchManager(atpClient, {
batchSize: 20,
flushInterval: 3000
});
// Send notifications through the batch manager
async function sendUserNotification(userId, message) {
try {
const result = await batchManager.addNotification({
user_id: userId,
type: 'message',
title: 'New Message',
body: message
});
return result.notification_id;
} catch (error) {
console.error('Failed to send notification:', error);
throw error;
}
}
Response Batching
Copy
class ResponseBatchProcessor:
def __init__(self, max_batch_size=100):
self.response_queue = []
self.max_batch_size = max_batch_size
self.processing = False
def add_response(self, notification_id, action_id, response_data):
"""Add a response to the processing queue"""
self.response_queue.append({
'notification_id': notification_id,
'action_id': action_id,
'response_data': response_data
})
# Process batch if we've reached max size
if len(self.response_queue) >= self.max_batch_size:
self.process_batch()
def process_batch(self):
"""Process a batch of responses"""
if self.processing or not self.response_queue:
return
self.processing = True
try:
# Get a batch of responses
batch = self.response_queue[:self.max_batch_size]
self.response_queue = self.response_queue[self.max_batch_size:]
# Group by action type for efficient processing
action_groups = {}
for response in batch:
action_id = response['action_id']
if action_id not in action_groups:
action_groups[action_id] = []
action_groups[action_id].append(response)
# Process each action group
for action_id, responses in action_groups.items():
self.process_action_group(action_id, responses)
finally:
self.processing = False
# If more items in queue, process next batch
if self.response_queue:
self.process_batch()
def process_action_group(self, action_id, responses):
"""Process a group of responses with the same action"""
# Example: Bulk database operations for the same action type
if action_id == "approve":
# Bulk approve all items
notification_ids = [r['notification_id'] for r in responses]
bulk_approve_items(notification_ids)
elif action_id == "form_submit":
# Bulk process form submissions
form_data_list = [
{'notification_id': r['notification_id'], 'data': r['response_data']}
for r in responses
]
bulk_process_form_submissions(form_data_list)
# Update all notification statuses in one call
update_notification_statuses(
[(r['notification_id'], 'processed') for r in responses]
)
# Helper functions for bulk operations
def bulk_approve_items(notification_ids):
"""Approve multiple items in a single database operation"""
# Example using SQLAlchemy
db.session.execute(
"""
UPDATE items
SET status = 'approved', updated_at = now()
WHERE notification_id IN :notification_ids
""",
{'notification_ids': tuple(notification_ids)}
)
db.session.commit()
def bulk_process_form_submissions(form_data_list):
"""Process multiple form submissions efficiently"""
# Prepare data for bulk insert
values = [
{
'notification_id': item['notification_id'],
'form_data': json.dumps(item['data']),
'processed_at': datetime.now()
}
for item in form_data_list
]
# Bulk insert
db.session.execute(
"""
INSERT INTO form_submissions (notification_id, form_data, processed_at)
VALUES (:notification_id, :form_data, :processed_at)
""",
values
)
db.session.commit()
Response Verification Patterns
Implement robust verification for all incoming responses with these patterns.Multi-Factor Verification
Copy
class ResponseVerifier:
def __init__(self, atp_client, webhook_secret):
self.atp_client = atp_client
self.webhook_secret = webhook_secret
def verify_response(self, payload, signature, request_headers):
"""
Comprehensive verification of an ATP response
"""
# Step 1: Verify signature (cryptographic)
if not self._verify_signature(payload, signature):
raise SecurityException("Invalid signature")
# Step 2: Verify timestamp (temporal)
if not self._verify_timestamp(payload.get('timestamp')):
raise SecurityException("Response too old or from the future")
# Step 3: Verify notification ownership (application)
notification_id = payload.get('notification_id')
if not self._verify_notification_ownership(notification_id):
raise SecurityException("Unknown notification ID")
# Step 4: Verify IP address (network)
client_ip = request_headers.get('X-Forwarded-For', '').split(',')[0].strip()
if not self._verify_ip_address(client_ip):
raise SecurityException("Unauthorized IP address")
# Step 5: Verify rate limits (behavioral)
if not self._verify_rate_limits(notification_id):
raise SecurityException("Rate limit exceeded")
# All verifications passed
return True
def _verify_signature(self, payload, signature):
"""Verify HMAC signature"""
expected_signature = hmac.new(
self.webhook_secret.encode(),
json.dumps(payload, sort_keys=True).encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected_signature, signature)
def _verify_timestamp(self, timestamp_str, max_age_seconds=300):
"""Verify timestamp is recent"""
if not timestamp_str:
return False
try:
# Parse ISO8601 timestamp
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
# Check if timestamp is in acceptable range
now = datetime.now(timezone.utc)
time_diff = abs((now - timestamp).total_seconds())
return time_diff <= max_age_seconds
except (ValueError, TypeError):
return False
def _verify_notification_ownership(self, notification_id):
"""Verify this notification was sent by this service"""
# Check against database of sent notifications
result = db.execute(
"SELECT COUNT(*) FROM sent_notifications WHERE id = ?",
(notification_id,)
)
return result.fetchone()[0] > 0
def _verify_ip_address(self, client_ip):
"""Verify IP is from trusted ATP servers"""
# Check against allowlist of ATP server IPs
trusted_ips = [
'192.0.2.1', # Example ATP server IPs
'192.0.2.2',
# ...
]
return client_ip in trusted_ips
def _verify_rate_limits(self, notification_id):
"""Verify not exceeding rate limits for responses"""
# Check recent responses count in Redis
key = f"response_count:{notification_id}"
count = redis_client.get(key)
if count and int(count) > 5: # Example: max 5 responses per notification
return False
# Increment counter with expiry
redis_client.incr(key)
redis_client.expire(key, 3600) # 1 hour expiry
return True
Dual Verification with ATP Server
Copy
import { ATPClient } from '@atp/client';
class DualVerificationHandler {
private atpClient: ATPClient;
private webhookSecret: string;
constructor(atpClient: ATPClient, webhookSecret: string) {
this.atpClient = atpClient;
this.webhookSecret = webhookSecret;
}
async verifyAndProcessWebhook(payload: any, signature: string): Promise<boolean> {
// Step 1: Local verification
if (!this.verifySignature(payload, signature)) {
throw new Error('Invalid signature');
}
const { notification_id, action_id, response_data } = payload;
// Step 2: Secondary verification with ATP server
try {
// Verify response is legitimate by querying the ATP server directly
const verificationResult = await this.atpClient.verifyResponse({
notification_id,
action_id,
verification_token: payload.verification_token
});
if (!verificationResult.valid) {
throw new Error('Response verification failed');
}
// Process the verified response
await this.processVerifiedResponse(notification_id, action_id, response_data);
return true;
} catch (error) {
console.error('Verification failed:', error);
return false;
}
}
private verifySignature(payload: any, signature: string): boolean {
// Implementation of signature verification
// ...
return true; // Placeholder
}
private async processVerifiedResponse(
notificationId: string,
actionId: string,
responseData: any
): Promise<void> {
// Process the verified response
// ...
}
}
Reliable Transaction Processing
Ensure response processing is reliable with these transactional patterns.Response Processing Saga
Copy
import java.util.UUID;
public class ResponseProcessor {
private final Database db;
private final ATPClient atpClient;
public ResponseProcessor(Database db, ATPClient atpClient) {
this.db = db;
this.atpClient = atpClient;
}
public void processResponse(String notificationId, String actionId, Map<String, Object> responseData) {
// Create a unique processing ID for tracking
String processingId = UUID.randomUUID().toString();
try {
// Start a processing record with status
db.executeUpdate(
"INSERT INTO response_processing (id, notification_id, status) VALUES (?, ?, ?)",
processingId, notificationId, "started"
);
// Step 1: Record the response
db.executeUpdate(
"INSERT INTO notification_responses (notification_id, action_id, response_data, received_at) VALUES (?, ?, ?, NOW())",
notificationId, actionId, JSON.stringify(responseData)
);
db.executeUpdate(
"UPDATE response_processing SET status = ? WHERE id = ?",
"recorded", processingId
);
// Step 2: Execute business logic
boolean success = executeBusinessLogic(notificationId, actionId, responseData);
db.executeUpdate(
"UPDATE response_processing SET status = ? WHERE id = ?",
"business_logic_executed", processingId
);
// Step 3: Update notification status on ATP server
atpClient.updateNotificationStatus(notificationId, "processed");
db.executeUpdate(
"UPDATE response_processing SET status = ? WHERE id = ?",
"completed", processingId
);
} catch (Exception e) {
// Log the error
logger.error("Error processing response: " + e.getMessage(), e);
// Update processing status to failed
try {
db.executeUpdate(
"UPDATE response_processing SET status = ?, error_message = ? WHERE id = ?",
"failed", e.getMessage(), processingId
);
} catch (Exception ex) {
logger.error("Failed to update processing status", ex);
}
// Trigger recovery process if needed
triggerRecoveryProcess(processingId);
// Throw the exception to notify the webhook handler
throw e;
}
}
private boolean executeBusinessLogic(String notificationId, String actionId, Map<String, Object> responseData) {
// Implement your business logic based on the response
// ...
return true;
}
private void triggerRecoveryProcess(String processingId) {
// Schedule a recovery job
recoveryService.scheduleRecovery(processingId);
}
}
Distributed Transaction Handling
For complex workflows that span multiple services:Copy
import uuid
from datetime import datetime
import json
class DistributedTransactionManager:
def __init__(self, db_conn, message_broker, atp_client):
self.db_conn = db_conn
self.message_broker = message_broker
self.atp_client = atp_client
def process_response(self, notification_id, action_id, response_data):
# Create transaction ID
transaction_id = str(uuid.uuid4())
# Record transaction start
self.record_transaction_step(
transaction_id,
notification_id,
"transaction_started"
)
try:
# Step 1: Record the response locally
self.db_conn.execute(
"""
INSERT INTO notification_responses
(notification_id, action_id, response_data, transaction_id, received_at)
VALUES (%s, %s, %s, %s, %s)
""",
(notification_id, action_id, json.dumps(response_data),
transaction_id, datetime.now())
)
self.record_transaction_step(
transaction_id,
notification_id,
"response_recorded"
)
# Step 2: Publish event to message broker for downstream processing
self.message_broker.publish(
"atp.responses",
{
"transaction_id": transaction_id,
"notification_id": notification_id,
"action_id": action_id,
"response_data": response_data,
"timestamp": datetime.now().isoformat()
}
)
self.record_transaction_step(
transaction_id,
notification_id,
"event_published"
)
# Step 3: Update the notification status on ATP server
self.atp_client.update_notification_status(
notification_id,
status="received",
metadata={"transaction_id": transaction_id}
)
self.record_transaction_step(
transaction_id,
notification_id,
"atp_status_updated"
)
# Mark transaction as complete
self.record_transaction_step(
transaction_id,
notification_id,
"transaction_completed"
)
return {
"status": "success",
"transaction_id": transaction_id
}
except Exception as e:
# Record failure
self.record_transaction_step(
transaction_id,
notification_id,
"transaction_failed",
error_message=str(e)
)
# Publish failure event for recovery
self.message_broker.publish(
"atp.transaction.failures",
{
"transaction_id": transaction_id,
"notification_id": notification_id,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
)
raise
def record_transaction_step(self, transaction_id, notification_id, step, error_message=None):
"""Record a step in the transaction process"""
self.db_conn.execute(
"""
INSERT INTO transaction_logs
(transaction_id, notification_id, step, error_message, timestamp)
VALUES (%s, %s, %s, %s, %s)
""",
(transaction_id, notification_id, step, error_message, datetime.now())
)
def recover_transaction(self, transaction_id):
"""Attempt to recover a failed transaction"""
# Get transaction details
result = self.db_conn.execute(
"SELECT * FROM transaction_logs WHERE transaction_id = %s ORDER BY timestamp",
(transaction_id,)
)
steps = result.fetchall()
if not steps:
raise ValueError(f"No transaction logs found for {transaction_id}")
# Determine last successful step
last_step = steps[-1]['step']
notification_id = steps[0]['notification_id']
# Recovery logic based on where the transaction failed
if last_step == "response_recorded":
# Failed after recording response, retry publishing
# ...
pass
elif last_step == "event_published":
# Failed after publishing, retry ATP status update
# ...
pass
elif last_step == "transaction_failed":
# Complete failure, manual intervention required
# ...
pass
Middleware Integration
Create middleware that integrates ATP seamlessly into your existing application architecture.Express.js Middleware
Copy
// ATP middleware for Express.js
const crypto = require('crypto');
function atpMiddleware(options = {}) {
const webhookSecret = options.webhookSecret;
const atpClient = options.atpClient;
const path = options.path || '/atp/webhook';
if (!webhookSecret) {
throw new Error('ATP webhook secret is required');
}
if (!atpClient) {
throw new Error('ATP client is required');
}
return function(req, res, next) {
// Only handle ATP webhook requests
if (req.path !== path || req.method !== 'POST') {
return next();
}
// Verify the request is from ATP
const signature = req.headers['x-atp-signature'];
const payload = req.body;
if (!verifySignature(payload, signature, webhookSecret)) {
return res.status(401).json({ error: 'Invalid signature' });
}
// Add ATP-specific properties to the request
req.atp = {
notification_id: payload.notification_id,
action_id: payload.action_id,
response_data: payload.response_data,
timestamp: payload.timestamp,
client: atpClient,
// Helper method to update notification status
updateStatus: async (status, metadata) => {
return atpClient.updateNotificationStatus(
payload.notification_id,
status,
metadata
);
}
};
// Continue processing
next();
};
}
// Helper function to verify ATP signature
function verifySignature(payload, signature, secret) {
if (!payload || !signature || !secret) {
return false;
}
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(JSON.stringify(payload, Object.keys(payload).sort()))
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(expectedSignature),
Buffer.from(signature)
);
}
// Usage
const express = require('express');
const { ATPClient } = require('@atp/client');
const app = express();
app.use(express.json());
const atpClient = new ATPClient({
serviceId: 'your-service-id',
apiKey: process.env.ATP_API_KEY
});
// Register ATP middleware
app.use(atpMiddleware({
webhookSecret: process.env.ATP_WEBHOOK_SECRET,
atpClient: atpClient,
path: '/webhooks/atp'
}));
// ATP webhook handler
app.post('/webhooks/atp', (req, res) => {
const { notification_id, action_id, response_data, updateStatus } = req.atp;
// Process the response
handleResponse(notification_id, action_id, response_data)
.then(async () => {
// Update status
await updateStatus('processed');
res.status(200).json({ status: 'success' });
})
.catch(err => {
console.error('Error processing response:', err);
res.status(500).json({ error: 'Failed to process response' });
});
});
function handleResponse(notificationId, actionId, responseData) {
// Your business logic here
return Promise.resolve();
}
Django Middleware
Copy
from django.utils.deprecation import MiddlewareMixin
import hmac
import hashlib
import json
class ATPMiddleware(MiddlewareMixin):
def __init__(self, get_response):
super().__init__(get_response)
from django.conf import settings
self.webhook_path = getattr(settings, 'ATP_WEBHOOK_PATH', '/api/atp/webhook/')
self.webhook_secret = getattr(settings, 'ATP_WEBHOOK_SECRET', None)
if not self.webhook_secret:
raise ValueError("ATP_WEBHOOK_SECRET must be set in Django settings")
def process_view(self, request, view_func, view_args, view_kwargs):
# Only process ATP webhook requests
if request.path != self.webhook_path or request.method != 'POST':
return None
# Verify signature
try:
signature = request.headers.get('X-ATP-Signature')
payload = json.loads(request.body)
if not self.verify_signature(payload, signature):
return JsonResponse({'error': 'Invalid signature'}, status=401)
# Add ATP data to request
request.atp = {
'notification_id': payload.get('notification_id'),
'action_id': payload.get('action_id'),
'response_data': payload.get('response_data'),
'raw_payload': payload,
}
except (json.JSONDecodeError, ValueError) as e:
return JsonResponse({'error': f'Invalid request: {str(e)}'}, status=400)
return None # Continue to view
def verify_signature(self, payload, signature):
if not payload or not signature:
return False
expected_signature = hmac.new(
self.webhook_secret.encode(),
json.dumps(payload, sort_keys=True).encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected_signature, signature)
Verification: Knowledge Check
Before implementing these advanced patterns, verify your understanding:Next Steps
Now that you understand advanced ATP patterns, explore other ATP client guides:Processing Responses
Learn how to securely handle and process user responses to your ATP notifications
Creating Notifications
Discover how to craft effective notifications that engage users