When users interact with your ATP notifications, their responses need to be captured, validated, and processed securely. This guide covers everything you need to know about handling responses in your ATP integration.
# Example webhook handler in Python using Flaskfrom flask import Flask, request, jsonifyimport hmacimport hashlibimport jsonapp = Flask(__name__)@app.route('/atp/webhook', methods=['POST'])def handle_atp_webhook(): # Get the webhook payload payload = request.json signature = request.headers.get('X-ATP-Signature') # Verify the signature (covered in Security section) if not verify_signature(payload, signature): return jsonify({"error": "Invalid signature"}), 401 # Process different response types notification_id = payload['notification_id'] action_id = payload['action_id'] response_data = payload['response_data'] # Handle the response based on your business logic process_response(notification_id, action_id, response_data) return jsonify({"status": "success"}), 200def process_response(notification_id, action_id, response_data): # Your business logic here # Example: Update a database, trigger an action, etc. pass
Always verify that incoming webhooks are actually from the ATP server:
def verify_signature(payload, signature): # Your webhook secret from ATP service registration secret = "your_webhook_secret" # Create a signature using the payload and your secret expected_signature = hmac.new( secret.encode(), json.dumps(payload, sort_keys=True).encode(), hashlib.sha256 ).hexdigest() # Compare signatures using a timing-safe comparison return hmac.compare_digest(expected_signature, signature)
Always validate the structure and content of responses:
def validate_response(response_data, expected_type): # Validate based on response type if expected_type == "simple": return True # Simple responses don't need validation elif expected_type == "text": # Validate text response return isinstance(response_data, str) and len(response_data) <= 10000 elif expected_type == "form": # Validate form data if not isinstance(response_data, dict): return False # Check required fields based on your form definition return all(k in response_data for k in required_fields) return False # Unknown response type
def handle_text_response(notification_id, action_id, response_data): # Text responses contain user-provided text user_input = response_data # Process the text input process_user_text_input(notification_id, user_input)
def handle_form_response(notification_id, action_id, response_data): # Form responses contain structured data form_data = response_data # Validate all required fields are present required_fields = ["name", "email", "reason"] if not all(field in form_data for field in required_fields): log_error(f"Missing required fields for notification {notification_id}") return # Process the form data process_form_submission(notification_id, form_data)
def update_task_state(notification_id, new_state): # Update your database or state management system db.execute( "UPDATE tasks SET state = ? WHERE notification_id = ?", (new_state, notification_id) ) # Optional: Update the notification status on the ATP server atp_client.update_notification_status( notification_id, status="processed" )
def process_response(notification_id, action_id, response_data): # Retrieve the original task context task = db.get_task_by_notification_id(notification_id) if not task: log_error(f"Unknown notification ID: {notification_id}") return # Execute business logic based on the response if action_id == "approve": execute_approved_action(task) update_task_state(notification_id, "approved") elif action_id == "reject": handle_rejection(task, response_data.get("reason", "")) update_task_state(notification_id, "rejected") # Notify relevant systems about the state change trigger_system_updates(task, new_state=task.state)
def handle_webhook_with_retry(): try: # Process the webhook... except (DatabaseConnectionError, ExternalServiceError) as e: # Log the error log_error(f"Temporary error processing webhook: {str(e)}") # Return a 5XX status to trigger a retry from the ATP server return jsonify({"error": "Service temporarily unavailable"}), 503
def verify_response_timestamp(payload): # Get the timestamp from the payload timestamp = payload.get('timestamp') if not timestamp: return False # Convert to datetime timestamp_dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) # Check if the timestamp is recent (within 5 minutes) now = datetime.now(timezone.utc) time_diff = now - timestamp_dt return time_diff.total_seconds() < 300 # 5 minutes
def verify_notification_id(notification_id): # Check if this is a notification your service actually sent result = db.execute( "SELECT COUNT(*) FROM sent_notifications WHERE id = ?", (notification_id,) ) return result.fetchone()[0] > 0
def correlate_response(notification_id): # Retrieve the original context original_request = db.get_request_by_notification_id(notification_id) if not original_request: log_error(f"Could not find original request for {notification_id}") return None # Return the correlation data return { "user_id": original_request.user_id, "request_id": original_request.id, "context": original_request.context, "timestamp": original_request.timestamp, }
def transform_response_for_downstream(notification_id, action_id, response_data): # Transform the ATP response into the format expected by your systems correlation = correlate_response(notification_id) if not correlation: return None # Create a standardized internal event return { "event_type": "user_decision", "source": "atp", "timestamp": datetime.now(timezone.utc).isoformat(), "user_id": correlation["user_id"], "request_id": correlation["request_id"], "decision": action_id, "additional_data": response_data, "metadata": { "notification_id": notification_id, "original_timestamp": correlation["timestamp"], } }