Kotlin Multiplatform ATP Client Examples
This document provides examples of implementing an ATP client in Kotlin Multiplatform, focusing on the core functionality for watching notifications and submitting responses.Core Client Implementation
The main ATP client class handles notification watching and response submission:Copy
import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
import io.ktor.http.*
import kotlinx.coroutines.flow.*
import kotlinx.serialization.json.*
/**
* Core ATP client for Kotlin Multiplatform
*/
class ATPClient(
private val baseUrl: String,
private val token: String
) {
// HTTP client with WebSocket support
private val httpClient = HttpClient {
install(WebSockets)
}
// Flow of incoming notifications
private val _notifications = MutableSharedFlow<Notification>(replay = 10)
val notifications = _notifications.asSharedFlow()
// Flow of status updates
private val _statusUpdates = MutableSharedFlow<StatusUpdate>(replay = 5)
val statusUpdates = _statusUpdates.asSharedFlow()
/**
* Start WebSocket connection to receive notifications
*/
fun startWatching() {
scope.launch {
try {
httpClient.webSocket(
method = HttpMethod.Get,
host = baseUrl.removePrefix("https://").removePrefix("http://"),
path = "/api/v1/client/stream?token=$token"
) {
// Send heartbeat periodically
val heartbeatJob = launch {
while (isActive) {
send(Frame.Text(Json.encodeToString(
HeartbeatAck.serializer(),
HeartbeatAck(System.currentTimeMillis().toString())
)))
delay(25000) // 25 seconds
}
}
try {
// Process incoming messages
for (frame in incoming) {
if (frame is Frame.Text) {
handleWebSocketMessage(frame.readText())
}
}
} finally {
heartbeatJob.cancel()
}
}
} catch (e: Exception) {
// Implement reconnection with exponential backoff
delay(calculateBackoffDelay(retryCount++))
if (shouldReconnect) {
startWatching()
}
}
}
}
/**
* Process WebSocket messages
*/
private suspend fun handleWebSocketMessage(messageText: String) {
val jsonObject = Json.parseToJsonElement(messageText).jsonObject
val type = jsonObject["type"]?.jsonPrimitive?.content ?: return
when (type) {
"notification" -> {
val notification = Json.decodeFromJsonElement(
Notification.serializer(),
jsonObject["data"]!!
)
_notifications.emit(notification)
}
"status_update" -> {
val statusUpdate = Json.decodeFromJsonElement(
StatusUpdate.serializer(),
jsonObject["data"]!!
)
_statusUpdates.emit(statusUpdate)
}
"heartbeat" -> {
// Respond to heartbeat in the websocket handler
}
}
}
/**
* Fetch notifications via REST API
* Use this as fallback when WebSocket isn't available
*/
suspend fun fetchPendingNotifications(
status: String = "pending",
limit: Int = 20,
cursor: String? = null
): NotificationResponse {
val response = httpClient.get("$baseUrl/api/v1/client/notifications") {
header("Authorization", "Bearer $token")
parameter("status", status)
parameter("limit", limit)
cursor?.let { parameter("cursor", it) }
}
return Json.decodeFromString(NotificationResponse.serializer(), response.bodyAsText())
}
/**
* Submit a response to a notification
*/
suspend fun respond(
notificationId: String,
actionId: String,
responseData: Any?
): ResponseResult {
// Create request body based on responseData type
val requestBody = buildJsonObject {
put("notification_id", JsonPrimitive(notificationId))
put("action_id", JsonPrimitive(actionId))
// Format response_data based on type
when (responseData) {
null -> put("response_data", JsonNull)
is Boolean -> put("response_data", JsonPrimitive(responseData))
is String -> put("response_data", JsonPrimitive(responseData))
is Number -> put("response_data", JsonPrimitive(responseData.toDouble()))
is List<*> -> {
val array = buildJsonArray {
responseData.forEach {
when (it) {
is String -> add(JsonPrimitive(it))
is Number -> add(JsonPrimitive(it.toDouble()))
is Boolean -> add(JsonPrimitive(it))
}
}
}
put("response_data", array)
}
}
}
// Send the response
val response = httpClient.post("$baseUrl/api/v1/client/respond") {
header("Authorization", "Bearer $token")
contentType(ContentType.Application.Json)
setBody(requestBody.toString())
}
// Parse the response
return Json.decodeFromString(ResponseResult.serializer(), response.bodyAsText())
}
}
Data Models
The key data models required for working with ATP:Copy
/**
* Core notification model
*/
@Serializable
data class Notification(
val id: String,
val version: String,
val timestamp: String,
val deadline: String? = null,
val service: Service,
val context: Context,
val actions: List<Action>,
val status: String? = null
)
@Serializable
data class Service(
val id: String,
val name: String,
val icon: String? = null
)
@Serializable
data class Context(
val title: String,
val description: String,
val project: String? = null,
val metadata: JsonObject? = null,
val attachments: List<Attachment>? = null
)
@Serializable
data class Attachment(
val type: String,
val description: String? = null,
val uri: String? = null,
val data: String? = null
)
@Serializable
data class Action(
val id: String,
val label: String,
val response_type: String,
val flags: List<String>? = null,
val options: JsonElement? = null,
val constraints: JsonObject? = null
)
@Serializable
data class StatusUpdate(
val notification_id: String,
val status: String,
val reason: String? = null,
val timestamp: String
)
@Serializable
data class NotificationResponse(
val notifications: List<Notification>,
val pagination: Pagination
)
@Serializable
data class Pagination(
val next_cursor: String? = null,
val has_more: Boolean,
val total_count: Int
)
@Serializable
data class ResponseResult(
val notification_id: String,
val action_id: String,
val status: String,
val responded_at: String
)
@Serializable
data class HeartbeatAck(
val timestamp: String,
val type: String = "heartbeat_ack"
)
Platform-Specific Notification Handling
Kotlin Multiplatform allows us to implement platform-specific notification handling while sharing the core ATP client:Copy
/**
* Interface for platform-specific notification handling
*/
expect class PlatformNotificationHandler {
fun showNotification(notification: Notification)
fun cancelNotification(notificationId: String)
}
/**
* Android implementation
*/
actual class PlatformNotificationHandler(
private val context: android.content.Context
) {
actual fun showNotification(notification: Notification) {
// Create and show Android notification
val notificationManager = context.getSystemService(
android.content.Context.NOTIFICATION_SERVICE
) as android.app.NotificationManager
// Create notification channel (Android O+)
if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.O) {
val channel = android.app.NotificationChannel(
"atp_channel",
"ATP Notifications",
android.app.NotificationManager.IMPORTANCE_DEFAULT
)
notificationManager.createNotificationChannel(channel)
}
// Create notification
val builder = android.app.Notification.Builder(context, "atp_channel")
.setContentTitle(notification.context.title)
.setContentText(notification.context.description)
.setSmallIcon(android.R.drawable.ic_dialog_info)
// Show notification
notificationManager.notify(notification.id.hashCode(), builder.build())
}
actual fun cancelNotification(notificationId: String) {
val notificationManager = context.getSystemService(
android.content.Context.NOTIFICATION_SERVICE
) as android.app.NotificationManager
notificationManager.cancel(notificationId.hashCode())
}
}
/**
* iOS implementation
*/
actual class PlatformNotificationHandler {
actual fun showNotification(notification: Notification) {
// Get notification center
val center = platform.UserNotifications.UNUserNotificationCenter.currentNotificationCenter()
// Create notification content
val content = platform.UserNotifications.UNMutableNotificationContent().apply {
setTitle(notification.context.title)
setBody(notification.context.description)
setSound(platform.UserNotifications.UNNotificationSound.defaultSound)
}
// Create notification request
val request = platform.UserNotifications.UNNotificationRequest(
notification.id,
content,
null
)
// Add notification to be delivered
center.addNotificationRequest(request, null)
}
actual fun cancelNotification(notificationId: String) {
val center = platform.UserNotifications.UNUserNotificationCenter.currentNotificationCenter()
center.removePendingNotificationRequestsWithIdentifiers(listOf(notificationId))
center.removeDeliveredNotificationsWithIdentifiers(listOf(notificationId))
}
}
Notification Manager
A class to handle notifications across platforms:Copy
/**
* Manages notifications and interactions with ATP
*/
class NotificationManager(
private val atpClient: ATPClient,
private val platformHandler: PlatformNotificationHandler
) {
// In-memory storage for notifications
private val notifications = mutableMapOf<String, Notification>()
/**
* Initialize notification handling
*/
fun initialize() {
// Collect notifications from ATP client
scope.launch {
atpClient.notifications.collect { notification ->
// Store notification
notifications[notification.id] = notification
// Show platform notification
platformHandler.showNotification(notification)
}
}
// Collect status updates
scope.launch {
atpClient.statusUpdates.collect { update ->
// Handle status changes
if (update.status == "invalidated" || update.status == "responded") {
// Remove notification from storage
notifications.remove(update.notification_id)
// Cancel platform notification
platformHandler.cancelNotification(update.notification_id)
}
}
}
// Start watching for notifications
atpClient.startWatching()
}
/**
* Submit a response to a notification
*/
suspend fun respondToNotification(
notificationId: String,
actionId: String,
responseData: Any?
): Result<ResponseResult> {
return try {
val result = atpClient.respond(notificationId, actionId, responseData)
// Remove notification from storage
notifications.remove(notificationId)
// Cancel platform notification
platformHandler.cancelNotification(notificationId)
Result.success(result)
} catch (e: Exception) {
Result.failure(e)
}
}
/**
* Get all pending notifications
*/
fun getPendingNotifications(): List<Notification> {
return notifications.values.toList()
}
/**
* Get a notification by ID
*/
fun getNotification(notificationId: String): Notification? {
return notifications[notificationId]
}
}
Alternative Polling Implementation
For environments where WebSockets aren’t available:Copy
/**
* Polls for notifications periodically
*/
class NotificationPoller(
private val atpClient: ATPClient,
private val platformHandler: PlatformNotificationHandler
) {
private var isPolling = false
private var cursor: String? = null
/**
* Start polling for notifications
*/
fun startPolling(intervalMs: Long = 30000) {
if (isPolling) return
isPolling = true
scope.launch {
while (isPolling) {
try {
// Fetch pending notifications
val response = atpClient.fetchPendingNotifications(cursor = cursor)
// Process each notification
response.notifications.forEach { notification ->
platformHandler.showNotification(notification)
}
// Update cursor for next poll
cursor = response.pagination.next_cursor
// Adaptive polling interval based on activity
val delay = if (response.notifications.isEmpty()) {
intervalMs * 2 // Slow down when no notifications
} else {
intervalMs // Regular interval when there are notifications
}
// Wait for next poll
delay(minOf(delay, 120000)) // Cap at 2 minutes
} catch (e: Exception) {
// Error backoff
delay(60000)
}
}
}
}
/**
* Stop polling
*/
fun stopPolling() {
isPolling = false
}
}
Error Handling
Robust error handling for response submission:Copy
/**
* Submit a response with retry capability
*/
suspend fun submitResponseWithRetry(
atpClient: ATPClient,
notificationId: String,
actionId: String,
responseData: Any?
): Result<ResponseResult> {
var retryCount = 0
val maxRetries = 3
while (retryCount <= maxRetries) {
try {
val result = atpClient.respond(notificationId, actionId, responseData)
return Result.success(result)
} catch (e: Exception) {
// Check if error is terminal (no retry)
if (isTerminalError(e)) {
return Result.failure(e)
}
// Retry for transient errors
if (retryCount < maxRetries) {
// Exponential backoff
val delayMs = 1000L * (1L shl retryCount)
delay(delayMs)
retryCount++
} else {
return Result.failure(e)
}
}
}
return Result.failure(Exception("Max retries exceeded"))
}
/**
* Determine if an error is terminal (shouldn't be retried)
*/
private fun isTerminalError(error: Exception): Boolean {
// Terminal states include already responded, expired, invalidated
val message = error.message?.lowercase() ?: ""
return message.contains("already_responded") ||
message.contains("expired") ||
message.contains("invalidated") ||
message.contains("invalid_response_data")
}
Integration Example
Here’s an example of integrating the ATP client into a multiplatform application:Copy
/**
* Sets up ATP for a multiplatform application
*/
fun setupATP(token: String) {
// Create ATP client
val atpClient = ATPClient(
baseUrl = "https://atp.example.com",
token = token
)
// Create platform-specific notification handler
val platformHandler = PlatformNotificationHandler()
// Create notification manager
val notificationManager = NotificationManager(atpClient, platformHandler)
// Initialize notification handling
notificationManager.initialize()
// Optional: Setup polling as fallback if WebSockets aren't reliable
val poller = NotificationPoller(atpClient, platformHandler)
// Detect if WebSocket fails and fall back to polling
scope.launch {
try {
// Attempt WebSocket connection
atpClient.startWatching()
} catch (e: Exception) {
// Fall back to polling if WebSocket fails
poller.startPolling()
}
}
}
/**
* Example of responding to a notification
*/
suspend fun respondToNotification(
notificationManager: NotificationManager,
notificationId: String,
actionId: String,
responseData: Any?
) {
try {
val result = notificationManager.respondToNotification(
notificationId = notificationId,
actionId = actionId,
responseData = responseData
)
result.onSuccess { response ->
println("Response submitted successfully: ${response.status}")
}.onFailure { error ->
println("Failed to submit response: ${error.message}")
}
} catch (e: Exception) {
println("Error: ${e.message}")
}
}
Conclusion
This document demonstrates the essential components needed to implement an ATP client in Kotlin Multiplatform:- Core ATP client for communication with the ATP server
- Platform-specific notification handling for Android and iOS
- Notification management for tracking and responding to notifications
- Polling fallback for when WebSockets aren’t available
- Error handling with retry logic for response submission