Documentation Index
Fetch the complete documentation index at: https://atp.hypertext.studio/llms.txt
Use this file to discover all available pages before exploring further.
Advanced Integration Patterns
Once you’ve mastered the basics of ATP integration, these advanced patterns can help you build more robust, maintainable, and efficient implementations.The Decorator Pattern
The decorator pattern provides a clean way to separate ATP functionality from your core business logic.Python Decorator Example
import functools
from typing import Callable, Dict, Any
def notify_on_completion(notification_type: str, title_template: str):
"""
Decorator that sends an ATP notification after the function completes.
Args:
notification_type: The type of notification to send
title_template: A template string for the notification title
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Execute the original function
result = func(*args, **kwargs)
# Extract context for the notification
context = {
"function_name": func.__name__,
"args": args,
"kwargs": kwargs,
"result": result
}
# Create notification title from template
title = title_template.format(**context)
# Send ATP notification
send_atp_notification(
notification_type=notification_type,
title=title,
context=context
)
return result
return wrapper
return decorator
# Usage example
@notify_on_completion(
notification_type="task_completed",
title_template="Task {function_name} completed successfully"
)
def process_order(order_id: str, user_id: str):
# Process the order...
return {"status": "completed", "order_id": order_id}
TypeScript Decorator Example
import { ATPClient } from '@atp/client';
// Notification decorator factory
function NotifyOnCompletion(notificationType: string, titleTemplate: string) {
return function (
target: any,
propertyKey: string,
descriptor: PropertyDescriptor
) {
const originalMethod = descriptor.value;
descriptor.value = async function (...args: any[]) {
// Call the original method
const result = await originalMethod.apply(this, args);
// Extract context for the notification
const context = {
functionName: propertyKey,
args,
result
};
// Create notification title from template
const title = titleTemplate.replace(/{(\w+)}/g, (_, key) =>
context[key] !== undefined ? context[key] : '{' + key + '}'
);
// Send ATP notification
const atpClient = new ATPClient();
await atpClient.sendNotification({
type: notificationType,
title,
context
});
return result;
};
return descriptor;
};
}
// Usage example
class OrderService {
@NotifyOnCompletion('task_completed', 'Order {args[0]} processed successfully')
async processOrder(orderId: string, userId: string) {
// Process the order...
return { status: 'completed', orderId };
}
}
Benefits of the Decorator Approach
- Separation of Concerns: Keeps notification logic separate from core business logic
- Reusability: Apply the same notification patterns across different functions
- Configurability: Customize notifications without changing the underlying code
- Testability: Test your core logic independently from ATP integrations
Polling vs. Webhooks
ATP supports both polling and webhook mechanisms for receiving responses. Here’s how to implement each efficiently:Webhook Implementation
# Webhook handler with advanced features
from flask import Flask, request, jsonify
import hmac
import hashlib
import json
import redis
app = Flask(__name__)
redis_client = redis.Redis()
@app.route('/atp/webhook', methods=['POST'])
def handle_atp_webhook():
# Get the webhook payload
payload = request.json
signature = request.headers.get('X-ATP-Signature')
# Verify the signature
if not verify_signature(payload, signature):
return jsonify({"error": "Invalid signature"}), 401
# Check for duplicate webhook (idempotency)
webhook_id = payload.get('webhook_id')
if webhook_id and redis_client.exists(f"processed_webhook:{webhook_id}"):
return jsonify({"status": "already processed"}), 200
# Process the webhook
try:
notification_id = payload['notification_id']
action_id = payload['action_id']
response_data = payload['response_data']
process_response(notification_id, action_id, response_data)
# Mark webhook as processed
if webhook_id:
redis_client.setex(f"processed_webhook:{webhook_id}", 86400, "1")
return jsonify({"status": "success"}), 200
except Exception as e:
# Log the error but return 200 to prevent retries if we've seen it
if webhook_id:
redis_client.setex(f"processed_webhook:{webhook_id}", 86400, "error")
return jsonify({"status": "error", "message": str(e)}), 500
Polling Implementation
import { ATPClient } from '@atp/client';
class ResponsePoller {
private atpClient: ATPClient;
private pollingInterval: number; // milliseconds
private isPolling: boolean = false;
private intervalId?: NodeJS.Timeout;
private notificationIds: string[] = [];
constructor(atpClient: ATPClient, pollingInterval: number = 60000) {
this.atpClient = atpClient;
this.pollingInterval = pollingInterval;
}
// Add a notification to poll for
public addNotification(notificationId: string): void {
if (!this.notificationIds.includes(notificationId)) {
this.notificationIds.push(notificationId);
// Start polling if not already started
if (!this.isPolling) {
this.startPolling();
}
}
}
// Remove a notification from polling
public removeNotification(notificationId: string): void {
this.notificationIds = this.notificationIds.filter(id => id !== notificationId);
// Stop polling if no more notifications to poll
if (this.notificationIds.length === 0 && this.isPolling) {
this.stopPolling();
}
}
// Start the polling process
private startPolling(): void {
if (this.isPolling) return;
this.isPolling = true;
this.poll(); // Poll immediately
this.intervalId = setInterval(() => this.poll(), this.pollingInterval);
}
// Stop the polling process
private stopPolling(): void {
if (!this.isPolling) return;
this.isPolling = false;
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = undefined;
}
}
// Poll for responses
private async poll(): Promise<void> {
if (this.notificationIds.length === 0) return;
try {
// Request responses for multiple notifications in batch
const responses = await this.atpClient.getResponses(this.notificationIds);
// Process each response
for (const response of responses) {
// Process based on status
if (response.status === 'responded') {
this.processResponse(response);
this.removeNotification(response.notification_id);
} else if (response.status === 'expired') {
this.handleExpiredNotification(response.notification_id);
this.removeNotification(response.notification_id);
}
}
} catch (error) {
console.error('Polling error:', error);
// Implement exponential backoff or other error handling
}
}
private processResponse(response: any): void {
// Process the response based on your business logic
const { notification_id, action_id, response_data } = response;
// Handle the response...
console.log(`Processed response for notification ${notification_id}`);
// Notify your application of the response
// e.g., emit an event, call a callback, etc.
}
private handleExpiredNotification(notificationId: string): void {
// Handle expired notifications
console.log(`Notification ${notificationId} expired without response`);
// Notify your application that the notification expired
}
}
// Usage
const atpClient = new ATPClient({/* config */});
const poller = new ResponsePoller(atpClient, 30000); // Poll every 30 seconds
// When sending a notification
const notificationId = await atpClient.sendNotification({/* notification */});
poller.addNotification(notificationId);
Choosing Between Polling and Webhooks
| Approach | Advantages | Disadvantages | Best For |
|---|---|---|---|
| Webhooks | - Real-time updates - Lower latency - Reduced server load | - Requires public endpoint - More complex setup - Needs retry handling | Services with public endpoints that need immediate responses |
| Polling | - Works behind firewalls - Simpler implementation - More control over timing | - Higher latency - More API calls - Potential rate limiting | Development environments, internal services, or when webhooks aren’t feasible |
Timeout Handling
Implement robust timeout mechanisms to handle cases where responses don’t arrive within expected timeframes.Deadline-Based Processing
import time
from datetime import datetime, timedelta
class NotificationManager:
def __init__(self, atp_client):
self.atp_client = atp_client
self.pending_notifications = {} # notification_id -> expiry_time
def send_notification(self, notification_data, timeout_seconds=3600):
# Send the notification
response = self.atp_client.send_notification(notification_data)
notification_id = response['notification_id']
# Set expiry time
expiry_time = datetime.now() + timedelta(seconds=timeout_seconds)
# Store in pending notifications
self.pending_notifications[notification_id] = {
'expiry_time': expiry_time,
'data': notification_data,
'timeout_seconds': timeout_seconds
}
return notification_id
def process_response(self, notification_id, action_id, response_data):
# Process the response
# ...
# Remove from pending
if notification_id in self.pending_notifications:
del self.pending_notifications[notification_id]
def check_timeouts(self):
"""Check for timed-out notifications and handle them"""
now = datetime.now()
timed_out = []
# Find expired notifications
for notification_id, info in self.pending_notifications.items():
if now > info['expiry_time']:
timed_out.append((notification_id, info))
# Process timeouts
for notification_id, info in timed_out:
self.handle_timeout(notification_id, info)
del self.pending_notifications[notification_id]
def handle_timeout(self, notification_id, info):
"""Handle a notification timeout"""
# Log the timeout
print(f"Notification {notification_id} timed out after {info['timeout_seconds']} seconds")
# Update the notification status on ATP server
self.atp_client.update_notification_status(
notification_id,
status="cancelled",
reason="Response timeout exceeded"
)
# Execute fallback logic
# e.g., make a default decision, notify admin, etc.
self.execute_timeout_fallback(notification_id, info['data'])
def execute_timeout_fallback(self, notification_id, original_data):
"""Execute fallback logic for timed out notification"""
# Implement your timeout fallback strategy
# For example:
# - Apply a default decision
# - Escalate to a human
# - Retry with a different approach
pass
def start_timeout_checker(self, check_interval=60):
"""Start a background thread that checks for timeouts"""
import threading
def check_loop():
while True:
try:
self.check_timeouts()
except Exception as e:
print(f"Error in timeout checker: {e}")
time.sleep(check_interval)
thread = threading.Thread(target=check_loop, daemon=True)
thread.start()
return thread
Timeout Strategies
| Strategy | Implementation | Use Cases |
|---|---|---|
| Default Action | After timeout, apply a conservative default decision | Approval workflows where denial is the safe default |
| Escalation | Route to human supervisor or higher-priority notification | Critical decisions requiring human judgment |
| Retry | Send a follow-up notification with urgency marker | When user response is essential |
| Graceful Degradation | Proceed with limited functionality | When system can function with reduced capabilities |
| Transaction Abortion | Cancel the entire operation | When partial completion would cause inconsistency |
Batching Techniques
Optimize performance by batching notifications and processing responses in groups.Notification Batching
// BatchManager for ATP notifications
class ATPBatchManager {
constructor(atpClient, options = {}) {
this.atpClient = atpClient;
this.batchSize = options.batchSize || 10;
this.flushInterval = options.flushInterval || 5000; // 5 seconds
this.queue = [];
this.sendPromises = new Map();
this.lastFlushed = Date.now();
// Set up periodic flushing
this.intervalId = setInterval(() => this.flushIfNeeded(), 1000);
}
async addNotification(notification) {
return new Promise((resolve, reject) => {
// Add to queue with resolve/reject handlers
this.queue.push({
notification,
resolve,
reject
});
// Flush immediately if batch size reached
if (this.queue.length >= this.batchSize) {
this.flush();
}
});
}
flushIfNeeded() {
const now = Date.now();
// Flush if interval elapsed and queue has items
if (this.queue.length > 0 && now - this.lastFlushed >= this.flushInterval) {
this.flush();
}
}
async flush() {
if (this.queue.length === 0) return;
const batch = this.queue.splice(0, this.batchSize);
this.lastFlushed = Date.now();
try {
// Create batch request
const notifications = batch.map(item => item.notification);
const results = await this.atpClient.sendBatchNotifications(notifications);
// Resolve promises with corresponding results
results.forEach((result, index) => {
if (result.success) {
batch[index].resolve(result);
} else {
batch[index].reject(new Error(result.error || 'Failed to send notification'));
}
});
} catch (error) {
// If batch request fails, reject all promises
batch.forEach(item => item.reject(error));
}
}
destroy() {
// Clean up interval on shutdown
if (this.intervalId) {
clearInterval(this.intervalId);
}
// Flush any remaining items
if (this.queue.length > 0) {
this.flush();
}
}
}
// Usage
const batchManager = new ATPBatchManager(atpClient, {
batchSize: 20,
flushInterval: 3000
});
// Send notifications through the batch manager
async function sendUserNotification(userId, message) {
try {
const result = await batchManager.addNotification({
user_id: userId,
type: 'message',
title: 'New Message',
body: message
});
return result.notification_id;
} catch (error) {
console.error('Failed to send notification:', error);
throw error;
}
}
Response Batching
class ResponseBatchProcessor:
def __init__(self, max_batch_size=100):
self.response_queue = []
self.max_batch_size = max_batch_size
self.processing = False
def add_response(self, notification_id, action_id, response_data):
"""Add a response to the processing queue"""
self.response_queue.append({
'notification_id': notification_id,
'action_id': action_id,
'response_data': response_data
})
# Process batch if we've reached max size
if len(self.response_queue) >= self.max_batch_size:
self.process_batch()
def process_batch(self):
"""Process a batch of responses"""
if self.processing or not self.response_queue:
return
self.processing = True
try:
# Get a batch of responses
batch = self.response_queue[:self.max_batch_size]
self.response_queue = self.response_queue[self.max_batch_size:]
# Group by action type for efficient processing
action_groups = {}
for response in batch:
action_id = response['action_id']
if action_id not in action_groups:
action_groups[action_id] = []
action_groups[action_id].append(response)
# Process each action group
for action_id, responses in action_groups.items():
self.process_action_group(action_id, responses)
finally:
self.processing = False
# If more items in queue, process next batch
if self.response_queue:
self.process_batch()
def process_action_group(self, action_id, responses):
"""Process a group of responses with the same action"""
# Example: Bulk database operations for the same action type
if action_id == "approve":
# Bulk approve all items
notification_ids = [r['notification_id'] for r in responses]
bulk_approve_items(notification_ids)
elif action_id == "form_submit":
# Bulk process form submissions
form_data_list = [
{'notification_id': r['notification_id'], 'data': r['response_data']}
for r in responses
]
bulk_process_form_submissions(form_data_list)
# Update all notification statuses in one call
update_notification_statuses(
[(r['notification_id'], 'processed') for r in responses]
)
# Helper functions for bulk operations
def bulk_approve_items(notification_ids):
"""Approve multiple items in a single database operation"""
# Example using SQLAlchemy
db.session.execute(
"""
UPDATE items
SET status = 'approved', updated_at = now()
WHERE notification_id IN :notification_ids
""",
{'notification_ids': tuple(notification_ids)}
)
db.session.commit()
def bulk_process_form_submissions(form_data_list):
"""Process multiple form submissions efficiently"""
# Prepare data for bulk insert
values = [
{
'notification_id': item['notification_id'],
'form_data': json.dumps(item['data']),
'processed_at': datetime.now()
}
for item in form_data_list
]
# Bulk insert
db.session.execute(
"""
INSERT INTO form_submissions (notification_id, form_data, processed_at)
VALUES (:notification_id, :form_data, :processed_at)
""",
values
)
db.session.commit()
Response Verification Patterns
Implement robust verification for all incoming responses with these patterns.Multi-Factor Verification
class ResponseVerifier:
def __init__(self, atp_client, webhook_secret):
self.atp_client = atp_client
self.webhook_secret = webhook_secret
def verify_response(self, payload, signature, request_headers):
"""
Comprehensive verification of an ATP response
"""
# Step 1: Verify signature (cryptographic)
if not self._verify_signature(payload, signature):
raise SecurityException("Invalid signature")
# Step 2: Verify timestamp (temporal)
if not self._verify_timestamp(payload.get('timestamp')):
raise SecurityException("Response too old or from the future")
# Step 3: Verify notification ownership (application)
notification_id = payload.get('notification_id')
if not self._verify_notification_ownership(notification_id):
raise SecurityException("Unknown notification ID")
# Step 4: Verify IP address (network)
client_ip = request_headers.get('X-Forwarded-For', '').split(',')[0].strip()
if not self._verify_ip_address(client_ip):
raise SecurityException("Unauthorized IP address")
# Step 5: Verify rate limits (behavioral)
if not self._verify_rate_limits(notification_id):
raise SecurityException("Rate limit exceeded")
# All verifications passed
return True
def _verify_signature(self, payload, signature):
"""Verify HMAC signature"""
expected_signature = hmac.new(
self.webhook_secret.encode(),
json.dumps(payload, sort_keys=True).encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected_signature, signature)
def _verify_timestamp(self, timestamp_str, max_age_seconds=300):
"""Verify timestamp is recent"""
if not timestamp_str:
return False
try:
# Parse ISO8601 timestamp
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
# Check if timestamp is in acceptable range
now = datetime.now(timezone.utc)
time_diff = abs((now - timestamp).total_seconds())
return time_diff <= max_age_seconds
except (ValueError, TypeError):
return False
def _verify_notification_ownership(self, notification_id):
"""Verify this notification was sent by this service"""
# Check against database of sent notifications
result = db.execute(
"SELECT COUNT(*) FROM sent_notifications WHERE id = ?",
(notification_id,)
)
return result.fetchone()[0] > 0
def _verify_ip_address(self, client_ip):
"""Verify IP is from trusted ATP servers"""
# Check against allowlist of ATP server IPs
trusted_ips = [
'192.0.2.1', # Example ATP server IPs
'192.0.2.2',
# ...
]
return client_ip in trusted_ips
def _verify_rate_limits(self, notification_id):
"""Verify not exceeding rate limits for responses"""
# Check recent responses count in Redis
key = f"response_count:{notification_id}"
count = redis_client.get(key)
if count and int(count) > 5: # Example: max 5 responses per notification
return False
# Increment counter with expiry
redis_client.incr(key)
redis_client.expire(key, 3600) # 1 hour expiry
return True
Dual Verification with ATP Server
import { ATPClient } from '@atp/client';
class DualVerificationHandler {
private atpClient: ATPClient;
private webhookSecret: string;
constructor(atpClient: ATPClient, webhookSecret: string) {
this.atpClient = atpClient;
this.webhookSecret = webhookSecret;
}
async verifyAndProcessWebhook(payload: any, signature: string): Promise<boolean> {
// Step 1: Local verification
if (!this.verifySignature(payload, signature)) {
throw new Error('Invalid signature');
}
const { notification_id, action_id, response_data } = payload;
// Step 2: Secondary verification with ATP server
try {
// Verify response is legitimate by querying the ATP server directly
const verificationResult = await this.atpClient.verifyResponse({
notification_id,
action_id,
verification_token: payload.verification_token
});
if (!verificationResult.valid) {
throw new Error('Response verification failed');
}
// Process the verified response
await this.processVerifiedResponse(notification_id, action_id, response_data);
return true;
} catch (error) {
console.error('Verification failed:', error);
return false;
}
}
private verifySignature(payload: any, signature: string): boolean {
// Implementation of signature verification
// ...
return true; // Placeholder
}
private async processVerifiedResponse(
notificationId: string,
actionId: string,
responseData: any
): Promise<void> {
// Process the verified response
// ...
}
}
Reliable Transaction Processing
Ensure response processing is reliable with these transactional patterns.Response Processing Saga
import java.util.UUID;
public class ResponseProcessor {
private final Database db;
private final ATPClient atpClient;
public ResponseProcessor(Database db, ATPClient atpClient) {
this.db = db;
this.atpClient = atpClient;
}
public void processResponse(String notificationId, String actionId, Map<String, Object> responseData) {
// Create a unique processing ID for tracking
String processingId = UUID.randomUUID().toString();
try {
// Start a processing record with status
db.executeUpdate(
"INSERT INTO response_processing (id, notification_id, status) VALUES (?, ?, ?)",
processingId, notificationId, "started"
);
// Step 1: Record the response
db.executeUpdate(
"INSERT INTO notification_responses (notification_id, action_id, response_data, received_at) VALUES (?, ?, ?, NOW())",
notificationId, actionId, JSON.stringify(responseData)
);
db.executeUpdate(
"UPDATE response_processing SET status = ? WHERE id = ?",
"recorded", processingId
);
// Step 2: Execute business logic
boolean success = executeBusinessLogic(notificationId, actionId, responseData);
db.executeUpdate(
"UPDATE response_processing SET status = ? WHERE id = ?",
"business_logic_executed", processingId
);
// Step 3: Update notification status on ATP server
atpClient.updateNotificationStatus(notificationId, "processed");
db.executeUpdate(
"UPDATE response_processing SET status = ? WHERE id = ?",
"completed", processingId
);
} catch (Exception e) {
// Log the error
logger.error("Error processing response: " + e.getMessage(), e);
// Update processing status to failed
try {
db.executeUpdate(
"UPDATE response_processing SET status = ?, error_message = ? WHERE id = ?",
"failed", e.getMessage(), processingId
);
} catch (Exception ex) {
logger.error("Failed to update processing status", ex);
}
// Trigger recovery process if needed
triggerRecoveryProcess(processingId);
// Throw the exception to notify the webhook handler
throw e;
}
}
private boolean executeBusinessLogic(String notificationId, String actionId, Map<String, Object> responseData) {
// Implement your business logic based on the response
// ...
return true;
}
private void triggerRecoveryProcess(String processingId) {
// Schedule a recovery job
recoveryService.scheduleRecovery(processingId);
}
}
Distributed Transaction Handling
For complex workflows that span multiple services:import uuid
from datetime import datetime
import json
class DistributedTransactionManager:
def __init__(self, db_conn, message_broker, atp_client):
self.db_conn = db_conn
self.message_broker = message_broker
self.atp_client = atp_client
def process_response(self, notification_id, action_id, response_data):
# Create transaction ID
transaction_id = str(uuid.uuid4())
# Record transaction start
self.record_transaction_step(
transaction_id,
notification_id,
"transaction_started"
)
try:
# Step 1: Record the response locally
self.db_conn.execute(
"""
INSERT INTO notification_responses
(notification_id, action_id, response_data, transaction_id, received_at)
VALUES (%s, %s, %s, %s, %s)
""",
(notification_id, action_id, json.dumps(response_data),
transaction_id, datetime.now())
)
self.record_transaction_step(
transaction_id,
notification_id,
"response_recorded"
)
# Step 2: Publish event to message broker for downstream processing
self.message_broker.publish(
"atp.responses",
{
"transaction_id": transaction_id,
"notification_id": notification_id,
"action_id": action_id,
"response_data": response_data,
"timestamp": datetime.now().isoformat()
}
)
self.record_transaction_step(
transaction_id,
notification_id,
"event_published"
)
# Step 3: Update the notification status on ATP server
self.atp_client.update_notification_status(
notification_id,
status="received",
metadata={"transaction_id": transaction_id}
)
self.record_transaction_step(
transaction_id,
notification_id,
"atp_status_updated"
)
# Mark transaction as complete
self.record_transaction_step(
transaction_id,
notification_id,
"transaction_completed"
)
return {
"status": "success",
"transaction_id": transaction_id
}
except Exception as e:
# Record failure
self.record_transaction_step(
transaction_id,
notification_id,
"transaction_failed",
error_message=str(e)
)
# Publish failure event for recovery
self.message_broker.publish(
"atp.transaction.failures",
{
"transaction_id": transaction_id,
"notification_id": notification_id,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
)
raise
def record_transaction_step(self, transaction_id, notification_id, step, error_message=None):
"""Record a step in the transaction process"""
self.db_conn.execute(
"""
INSERT INTO transaction_logs
(transaction_id, notification_id, step, error_message, timestamp)
VALUES (%s, %s, %s, %s, %s)
""",
(transaction_id, notification_id, step, error_message, datetime.now())
)
def recover_transaction(self, transaction_id):
"""Attempt to recover a failed transaction"""
# Get transaction details
result = self.db_conn.execute(
"SELECT * FROM transaction_logs WHERE transaction_id = %s ORDER BY timestamp",
(transaction_id,)
)
steps = result.fetchall()
if not steps:
raise ValueError(f"No transaction logs found for {transaction_id}")
# Determine last successful step
last_step = steps[-1]['step']
notification_id = steps[0]['notification_id']
# Recovery logic based on where the transaction failed
if last_step == "response_recorded":
# Failed after recording response, retry publishing
# ...
pass
elif last_step == "event_published":
# Failed after publishing, retry ATP status update
# ...
pass
elif last_step == "transaction_failed":
# Complete failure, manual intervention required
# ...
pass
Middleware Integration
Create middleware that integrates ATP seamlessly into your existing application architecture.Express.js Middleware
// ATP middleware for Express.js
const crypto = require('crypto');
function atpMiddleware(options = {}) {
const webhookSecret = options.webhookSecret;
const atpClient = options.atpClient;
const path = options.path || '/atp/webhook';
if (!webhookSecret) {
throw new Error('ATP webhook secret is required');
}
if (!atpClient) {
throw new Error('ATP client is required');
}
return function(req, res, next) {
// Only handle ATP webhook requests
if (req.path !== path || req.method !== 'POST') {
return next();
}
// Verify the request is from ATP
const signature = req.headers['x-atp-signature'];
const payload = req.body;
if (!verifySignature(payload, signature, webhookSecret)) {
return res.status(401).json({ error: 'Invalid signature' });
}
// Add ATP-specific properties to the request
req.atp = {
notification_id: payload.notification_id,
action_id: payload.action_id,
response_data: payload.response_data,
timestamp: payload.timestamp,
client: atpClient,
// Helper method to update notification status
updateStatus: async (status, metadata) => {
return atpClient.updateNotificationStatus(
payload.notification_id,
status,
metadata
);
}
};
// Continue processing
next();
};
}
// Helper function to verify ATP signature
function verifySignature(payload, signature, secret) {
if (!payload || !signature || !secret) {
return false;
}
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(JSON.stringify(payload, Object.keys(payload).sort()))
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(expectedSignature),
Buffer.from(signature)
);
}
// Usage
const express = require('express');
const { ATPClient } = require('@atp/client');
const app = express();
app.use(express.json());
const atpClient = new ATPClient({
serviceId: 'your-service-id',
apiKey: process.env.ATP_API_KEY
});
// Register ATP middleware
app.use(atpMiddleware({
webhookSecret: process.env.ATP_WEBHOOK_SECRET,
atpClient: atpClient,
path: '/webhooks/atp'
}));
// ATP webhook handler
app.post('/webhooks/atp', (req, res) => {
const { notification_id, action_id, response_data, updateStatus } = req.atp;
// Process the response
handleResponse(notification_id, action_id, response_data)
.then(async () => {
// Update status
await updateStatus('processed');
res.status(200).json({ status: 'success' });
})
.catch(err => {
console.error('Error processing response:', err);
res.status(500).json({ error: 'Failed to process response' });
});
});
function handleResponse(notificationId, actionId, responseData) {
// Your business logic here
return Promise.resolve();
}
Django Middleware
from django.utils.deprecation import MiddlewareMixin
import hmac
import hashlib
import json
class ATPMiddleware(MiddlewareMixin):
def __init__(self, get_response):
super().__init__(get_response)
from django.conf import settings
self.webhook_path = getattr(settings, 'ATP_WEBHOOK_PATH', '/api/atp/webhook/')
self.webhook_secret = getattr(settings, 'ATP_WEBHOOK_SECRET', None)
if not self.webhook_secret:
raise ValueError("ATP_WEBHOOK_SECRET must be set in Django settings")
def process_view(self, request, view_func, view_args, view_kwargs):
# Only process ATP webhook requests
if request.path != self.webhook_path or request.method != 'POST':
return None
# Verify signature
try:
signature = request.headers.get('X-ATP-Signature')
payload = json.loads(request.body)
if not self.verify_signature(payload, signature):
return JsonResponse({'error': 'Invalid signature'}, status=401)
# Add ATP data to request
request.atp = {
'notification_id': payload.get('notification_id'),
'action_id': payload.get('action_id'),
'response_data': payload.get('response_data'),
'raw_payload': payload,
}
except (json.JSONDecodeError, ValueError) as e:
return JsonResponse({'error': f'Invalid request: {str(e)}'}, status=400)
return None # Continue to view
def verify_signature(self, payload, signature):
if not payload or not signature:
return False
expected_signature = hmac.new(
self.webhook_secret.encode(),
json.dumps(payload, sort_keys=True).encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected_signature, signature)
Verification: Knowledge Check
Before implementing these advanced patterns, verify your understanding:Next Steps
Now that you understand advanced ATP patterns, explore other ATP client guides:Processing Responses
Learn how to securely handle and process user responses to your ATP notifications
Creating Notifications
Discover how to craft effective notifications that engage users