from flask import Flask, request, jsonify
import hmac
import hashlib
import json
import os
app = Flask(__name__)
WEBHOOK_SECRET = os.environ.get('ATP_WEBHOOK_SECRET')
def verify_signature(payload_body, signature_header):
# Parse the signature header
header_parts = signature_header.split(',')
timestamp_str = header_parts[0].split('=')[1]
signature = header_parts[1].split('=')[1]
# Compute expected signature
expected_signature = hmac.new(
WEBHOOK_SECRET.encode('utf-8'),
payload_body,
hashlib.sha256
).hexdigest()
# Compare signatures using constant-time comparison
return hmac.compare_digest(signature, expected_signature)
@app.route('/atp/webhook', methods=['POST'])
def handle_webhook():
# Get the raw request body for signature verification
payload_body = request.data
# Verify signature
signature = request.headers.get('X-ATP-Signature')
if not signature:
return jsonify({"error": "Missing signature header"}), 401
if not verify_signature(payload_body, signature):
return jsonify({"error": "Invalid signature"}), 401
# Parse the payload
data = request.json
notification_id = data.get('notification_id')
action_id = data.get('action_id')
response_data = data.get('response_data')
responder = data.get('responder')
try:
# Process the response based on action type
if action_id == 'approve_deployment':
# Trigger deployment
trigger_deployment(notification_id)
elif action_id == 'reject_deployment':
# Cancel deployment with reason
cancel_deployment(notification_id, response_data)
else:
print(f"Unknown action_id: {action_id}")
# Respond with success
return jsonify({"status": "processed"}), 200
except Exception as e:
# Determine if this error is retriable
retriable = getattr(e, 'retriable', True)
# Respond with error
return jsonify({
"code": getattr(e, 'code', 'INTERNAL_ERROR'),
"message": str(e),
"user_message": "The system encountered an error processing your response.",
"retriable": retriable
}), 500
if __name__ == '__main__':
app.run(port=3000)