from atp import ATPClient, TriageNotification, Action, ResponseType
from datetime import datetime, timedelta
from flask import Flask, request, jsonify
import hmac, hashlib
import time
import threading
# Initialize the ATP client
client = ATPClient(
api_key="sk_live_service_abc123",
webhook_url="https://your-service.com/atp/webhook"
)
# Create and send a notification
def send_database_query_notification():
notification = TriageNotification(
title="Approve Database Query",
description="""
The system needs to run a large query that will analyze all customer records.
This query may take up to 30 minutes and could impact database performance.
Please approve or deny this operation.
""",
actions=[
Action(
id="approve",
label="Approve Query",
response_type=ResponseType.SIMPLE
),
Action(
id="deny",
label="Deny",
response_type=ResponseType.TEXT,
constraints={"placeholder": "Reason for denial"}
),
Action(
id="schedule",
label="Schedule for Later",
response_type=ResponseType.DATETIME,
constraints={"min": datetime.now().isoformat(), "max": (datetime.now() + timedelta(days=7)).isoformat()}
)
],
deadline=datetime.now() + timedelta(hours=1),
metadata={
"query_id": "analyze-customers-2023-q4",
"estimated_duration": "30min",
"estimated_rows": "1.2M",
"requested_by": "data-analytics-team"
}
)
# Send the notification
response = client.send_notification(notification)
notification_id = response.notification_id
print(f"Notification sent! ID: {notification_id}")
# Start a background thread to poll for status
threading.Thread(target=poll_notification_status, args=(notification_id,)).start()
return notification_id
# Poll for notification status
def poll_notification_status(notification_id):
while True:
status = client.get_notification_status(notification_id)
print(f"Notification status: {status.state}")
if status.responded:
print(f"Action selected: {status.action_id}")
if status.action_id == 'approve':
run_database_query(status.metadata['query_id'])
print("Query was approved and started!")
elif status.action_id == 'deny':
print(f"Query was denied. Reason: {status.response_data}")
elif status.action_id == 'schedule':
schedule_query(status.metadata['query_id'], status.response_data)
print(f"Query was scheduled for: {status.response_data}")
break
# Check again in 5 minutes
time.sleep(300)
# Flask web server for webhook
app = Flask(__name__)
@app.route('/atp/webhook', methods=['POST'])
def handle_webhook():
# Verify webhook signature
payload = request.data
signature = request.headers.get('X-ATP-Signature')
expected_signature = hmac.new(
bytes(client.webhook_secret, 'utf-8'),
payload,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(expected_signature, signature):
return jsonify({'error': 'Invalid signature'}), 403
# Process the webhook payload
data = request.json
notification_id = data.get('notification_id')
action_id = data.get('action_id')
response_data = data.get('response_data')
metadata = data.get('metadata')
# Handle different actions
if action_id == 'approve':
run_database_query(metadata['query_id'])
print(f"Query approved and started!")
elif action_id == 'deny':
print(f"Query denied. Reason: {response_data}")
elif action_id == 'schedule':
schedule_query(metadata['query_id'], response_data)
print(f"Query scheduled for: {response_data}")
# Acknowledge receipt of the webhook
return jsonify({'status': 'success'}), 200
# Example implementation functions
def run_database_query(query_id):
print(f"Running query {query_id}...")
# Implementation to run the query
def schedule_query(query_id, scheduled_time):
print(f"Scheduling query {query_id} for {scheduled_time}...")
# Implementation to schedule the query
if __name__ == '__main__':
# Send a notification
notification_id = send_database_query_notification()
# Start the webhook server
app.run(host='0.0.0.0', port=3000)