Skip to main content

Kotlin Multiplatform ATP Client Examples

This document provides examples of implementing an ATP client in Kotlin Multiplatform, focusing on the core functionality for watching notifications and submitting responses.

Core Client Implementation

The main ATP client class handles notification watching and response submission:
import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
import io.ktor.http.*
import kotlinx.coroutines.flow.*
import kotlinx.serialization.json.*

/**
 * Core ATP client for Kotlin Multiplatform
 */
class ATPClient(
    private val baseUrl: String,
    private val token: String
) {
    // HTTP client with WebSocket support
    private val httpClient = HttpClient {
        install(WebSockets)
    }
    
    // Flow of incoming notifications
    private val _notifications = MutableSharedFlow<Notification>(replay = 10)
    val notifications = _notifications.asSharedFlow()
    
    // Flow of status updates
    private val _statusUpdates = MutableSharedFlow<StatusUpdate>(replay = 5)
    val statusUpdates = _statusUpdates.asSharedFlow()
    
    /**
     * Start WebSocket connection to receive notifications
     */
    fun startWatching() {
        scope.launch {
            try {
                httpClient.webSocket(
                    method = HttpMethod.Get,
                    host = baseUrl.removePrefix("https://").removePrefix("http://"),
                    path = "/api/v1/client/stream?token=$token"
                ) {
                    // Send heartbeat periodically
                    val heartbeatJob = launch {
                        while (isActive) {
                            send(Frame.Text(Json.encodeToString(
                                HeartbeatAck.serializer(),
                                HeartbeatAck(System.currentTimeMillis().toString())
                            )))
                            delay(25000) // 25 seconds
                        }
                    }
                    
                    try {
                        // Process incoming messages
                        for (frame in incoming) {
                            if (frame is Frame.Text) {
                                handleWebSocketMessage(frame.readText())
                            }
                        }
                    } finally {
                        heartbeatJob.cancel()
                    }
                }
            } catch (e: Exception) {
                // Implement reconnection with exponential backoff
                delay(calculateBackoffDelay(retryCount++))
                if (shouldReconnect) {
                    startWatching()
                }
            }
        }
    }
    
    /**
     * Process WebSocket messages
     */
    private suspend fun handleWebSocketMessage(messageText: String) {
        val jsonObject = Json.parseToJsonElement(messageText).jsonObject
        val type = jsonObject["type"]?.jsonPrimitive?.content ?: return
        
        when (type) {
            "notification" -> {
                val notification = Json.decodeFromJsonElement(
                    Notification.serializer(), 
                    jsonObject["data"]!!
                )
                _notifications.emit(notification)
            }
            "status_update" -> {
                val statusUpdate = Json.decodeFromJsonElement(
                    StatusUpdate.serializer(), 
                    jsonObject["data"]!!
                )
                _statusUpdates.emit(statusUpdate)
            }
            "heartbeat" -> {
                // Respond to heartbeat in the websocket handler
            }
        }
    }
    
    /**
     * Fetch notifications via REST API
     * Use this as fallback when WebSocket isn't available
     */
    suspend fun fetchPendingNotifications(
        status: String = "pending",
        limit: Int = 20,
        cursor: String? = null
    ): NotificationResponse {
        val response = httpClient.get("$baseUrl/api/v1/client/notifications") {
            header("Authorization", "Bearer $token")
            parameter("status", status)
            parameter("limit", limit)
            cursor?.let { parameter("cursor", it) }
        }
        
        return Json.decodeFromString(NotificationResponse.serializer(), response.bodyAsText())
    }
    
    /**
     * Submit a response to a notification
     */
    suspend fun respond(
        notificationId: String,
        actionId: String,
        responseData: Any?
    ): ResponseResult {
        // Create request body based on responseData type
        val requestBody = buildJsonObject {
            put("notification_id", JsonPrimitive(notificationId))
            put("action_id", JsonPrimitive(actionId))
            
            // Format response_data based on type
            when (responseData) {
                null -> put("response_data", JsonNull)
                is Boolean -> put("response_data", JsonPrimitive(responseData))
                is String -> put("response_data", JsonPrimitive(responseData))
                is Number -> put("response_data", JsonPrimitive(responseData.toDouble()))
                is List<*> -> {
                    val array = buildJsonArray {
                        responseData.forEach {
                            when (it) {
                                is String -> add(JsonPrimitive(it))
                                is Number -> add(JsonPrimitive(it.toDouble()))
                                is Boolean -> add(JsonPrimitive(it))
                            }
                        }
                    }
                    put("response_data", array)
                }
            }
        }
        
        // Send the response
        val response = httpClient.post("$baseUrl/api/v1/client/respond") {
            header("Authorization", "Bearer $token")
            contentType(ContentType.Application.Json)
            setBody(requestBody.toString())
        }
        
        // Parse the response
        return Json.decodeFromString(ResponseResult.serializer(), response.bodyAsText())
    }
}

Data Models

The key data models required for working with ATP:
/**
 * Core notification model
 */
@Serializable
data class Notification(
    val id: String,
    val version: String,
    val timestamp: String,
    val deadline: String? = null,
    val service: Service,
    val context: Context,
    val actions: List<Action>,
    val status: String? = null
)

@Serializable
data class Service(
    val id: String,
    val name: String,
    val icon: String? = null
)

@Serializable
data class Context(
    val title: String,
    val description: String,
    val project: String? = null,
    val metadata: JsonObject? = null,
    val attachments: List<Attachment>? = null
)

@Serializable
data class Attachment(
    val type: String,
    val description: String? = null,
    val uri: String? = null,
    val data: String? = null
)

@Serializable
data class Action(
    val id: String,
    val label: String,
    val response_type: String,
    val flags: List<String>? = null,
    val options: JsonElement? = null,
    val constraints: JsonObject? = null
)

@Serializable
data class StatusUpdate(
    val notification_id: String,
    val status: String,
    val reason: String? = null,
    val timestamp: String
)

@Serializable
data class NotificationResponse(
    val notifications: List<Notification>,
    val pagination: Pagination
)

@Serializable
data class Pagination(
    val next_cursor: String? = null,
    val has_more: Boolean,
    val total_count: Int
)

@Serializable
data class ResponseResult(
    val notification_id: String,
    val action_id: String,
    val status: String,
    val responded_at: String
)

@Serializable
data class HeartbeatAck(
    val timestamp: String,
    val type: String = "heartbeat_ack"
)

Platform-Specific Notification Handling

Kotlin Multiplatform allows us to implement platform-specific notification handling while sharing the core ATP client:
/**
 * Interface for platform-specific notification handling
 */
expect class PlatformNotificationHandler {
    fun showNotification(notification: Notification)
    fun cancelNotification(notificationId: String)
}

/**
 * Android implementation
 */
actual class PlatformNotificationHandler(
    private val context: android.content.Context
) {
    actual fun showNotification(notification: Notification) {
        // Create and show Android notification
        val notificationManager = context.getSystemService(
            android.content.Context.NOTIFICATION_SERVICE
        ) as android.app.NotificationManager
        
        // Create notification channel (Android O+)
        if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.O) {
            val channel = android.app.NotificationChannel(
                "atp_channel",
                "ATP Notifications",
                android.app.NotificationManager.IMPORTANCE_DEFAULT
            )
            notificationManager.createNotificationChannel(channel)
        }
        
        // Create notification
        val builder = android.app.Notification.Builder(context, "atp_channel")
            .setContentTitle(notification.context.title)
            .setContentText(notification.context.description)
            .setSmallIcon(android.R.drawable.ic_dialog_info)
        
        // Show notification
        notificationManager.notify(notification.id.hashCode(), builder.build())
    }
    
    actual fun cancelNotification(notificationId: String) {
        val notificationManager = context.getSystemService(
            android.content.Context.NOTIFICATION_SERVICE
        ) as android.app.NotificationManager
        notificationManager.cancel(notificationId.hashCode())
    }
}

/**
 * iOS implementation
 */
actual class PlatformNotificationHandler {
    actual fun showNotification(notification: Notification) {
        // Get notification center
        val center = platform.UserNotifications.UNUserNotificationCenter.currentNotificationCenter()
        
        // Create notification content
        val content = platform.UserNotifications.UNMutableNotificationContent().apply {
            setTitle(notification.context.title)
            setBody(notification.context.description)
            setSound(platform.UserNotifications.UNNotificationSound.defaultSound)
        }
        
        // Create notification request
        val request = platform.UserNotifications.UNNotificationRequest(
            notification.id,
            content,
            null
        )
        
        // Add notification to be delivered
        center.addNotificationRequest(request, null)
    }
    
    actual fun cancelNotification(notificationId: String) {
        val center = platform.UserNotifications.UNUserNotificationCenter.currentNotificationCenter()
        center.removePendingNotificationRequestsWithIdentifiers(listOf(notificationId))
        center.removeDeliveredNotificationsWithIdentifiers(listOf(notificationId))
    }
}

Notification Manager

A class to handle notifications across platforms:
/**
 * Manages notifications and interactions with ATP
 */
class NotificationManager(
    private val atpClient: ATPClient,
    private val platformHandler: PlatformNotificationHandler
) {
    // In-memory storage for notifications
    private val notifications = mutableMapOf<String, Notification>()
    
    /**
     * Initialize notification handling
     */
    fun initialize() {
        // Collect notifications from ATP client
        scope.launch {
            atpClient.notifications.collect { notification ->
                // Store notification
                notifications[notification.id] = notification
                
                // Show platform notification
                platformHandler.showNotification(notification)
            }
        }
        
        // Collect status updates
        scope.launch {
            atpClient.statusUpdates.collect { update ->
                // Handle status changes
                if (update.status == "invalidated" || update.status == "responded") {
                    // Remove notification from storage
                    notifications.remove(update.notification_id)
                    
                    // Cancel platform notification
                    platformHandler.cancelNotification(update.notification_id)
                }
            }
        }
        
        // Start watching for notifications
        atpClient.startWatching()
    }
    
    /**
     * Submit a response to a notification
     */
    suspend fun respondToNotification(
        notificationId: String,
        actionId: String,
        responseData: Any?
    ): Result<ResponseResult> {
        return try {
            val result = atpClient.respond(notificationId, actionId, responseData)
            
            // Remove notification from storage
            notifications.remove(notificationId)
            
            // Cancel platform notification
            platformHandler.cancelNotification(notificationId)
            
            Result.success(result)
        } catch (e: Exception) {
            Result.failure(e)
        }
    }
    
    /**
     * Get all pending notifications
     */
    fun getPendingNotifications(): List<Notification> {
        return notifications.values.toList()
    }
    
    /**
     * Get a notification by ID
     */
    fun getNotification(notificationId: String): Notification? {
        return notifications[notificationId]
    }
}

Alternative Polling Implementation

For environments where WebSockets aren’t available:
/**
 * Polls for notifications periodically
 */
class NotificationPoller(
    private val atpClient: ATPClient,
    private val platformHandler: PlatformNotificationHandler
) {
    private var isPolling = false
    private var cursor: String? = null
    
    /**
     * Start polling for notifications
     */
    fun startPolling(intervalMs: Long = 30000) {
        if (isPolling) return
        isPolling = true
        
        scope.launch {
            while (isPolling) {
                try {
                    // Fetch pending notifications
                    val response = atpClient.fetchPendingNotifications(cursor = cursor)
                    
                    // Process each notification
                    response.notifications.forEach { notification ->
                        platformHandler.showNotification(notification)
                    }
                    
                    // Update cursor for next poll
                    cursor = response.pagination.next_cursor
                    
                    // Adaptive polling interval based on activity
                    val delay = if (response.notifications.isEmpty()) {
                        intervalMs * 2 // Slow down when no notifications
                    } else {
                        intervalMs // Regular interval when there are notifications
                    }
                    
                    // Wait for next poll
                    delay(minOf(delay, 120000)) // Cap at 2 minutes
                } catch (e: Exception) {
                    // Error backoff
                    delay(60000)
                }
            }
        }
    }
    
    /**
     * Stop polling
     */
    fun stopPolling() {
        isPolling = false
    }
}

Error Handling

Robust error handling for response submission:
/**
 * Submit a response with retry capability
 */
suspend fun submitResponseWithRetry(
    atpClient: ATPClient,
    notificationId: String,
    actionId: String,
    responseData: Any?
): Result<ResponseResult> {
    var retryCount = 0
    val maxRetries = 3
    
    while (retryCount <= maxRetries) {
        try {
            val result = atpClient.respond(notificationId, actionId, responseData)
            return Result.success(result)
        } catch (e: Exception) {
            // Check if error is terminal (no retry)
            if (isTerminalError(e)) {
                return Result.failure(e)
            }
            
            // Retry for transient errors
            if (retryCount < maxRetries) {
                // Exponential backoff
                val delayMs = 1000L * (1L shl retryCount)
                delay(delayMs)
                retryCount++
            } else {
                return Result.failure(e)
            }
        }
    }
    
    return Result.failure(Exception("Max retries exceeded"))
}

/**
 * Determine if an error is terminal (shouldn't be retried)
 */
private fun isTerminalError(error: Exception): Boolean {
    // Terminal states include already responded, expired, invalidated
    val message = error.message?.lowercase() ?: ""
    return message.contains("already_responded") ||
           message.contains("expired") ||
           message.contains("invalidated") ||
           message.contains("invalid_response_data")
}

Integration Example

Here’s an example of integrating the ATP client into a multiplatform application:
/**
 * Sets up ATP for a multiplatform application
 */
fun setupATP(token: String) {
    // Create ATP client
    val atpClient = ATPClient(
        baseUrl = "https://atp.example.com",
        token = token
    )
    
    // Create platform-specific notification handler
    val platformHandler = PlatformNotificationHandler()
    
    // Create notification manager
    val notificationManager = NotificationManager(atpClient, platformHandler)
    
    // Initialize notification handling
    notificationManager.initialize()
    
    // Optional: Setup polling as fallback if WebSockets aren't reliable
    val poller = NotificationPoller(atpClient, platformHandler)
    
    // Detect if WebSocket fails and fall back to polling
    scope.launch {
        try {
            // Attempt WebSocket connection
            atpClient.startWatching()
        } catch (e: Exception) {
            // Fall back to polling if WebSocket fails
            poller.startPolling()
        }
    }
}

/**
 * Example of responding to a notification
 */
suspend fun respondToNotification(
    notificationManager: NotificationManager,
    notificationId: String,
    actionId: String,
    responseData: Any?
) {
    try {
        val result = notificationManager.respondToNotification(
            notificationId = notificationId,
            actionId = actionId,
            responseData = responseData
        )
        
        result.onSuccess { response ->
            println("Response submitted successfully: ${response.status}")
        }.onFailure { error ->
            println("Failed to submit response: ${error.message}")
        }
    } catch (e: Exception) {
        println("Error: ${e.message}")
    }
}

Conclusion

This document demonstrates the essential components needed to implement an ATP client in Kotlin Multiplatform:
  1. Core ATP client for communication with the ATP server
  2. Platform-specific notification handling for Android and iOS
  3. Notification management for tracking and responding to notifications
  4. Polling fallback for when WebSockets aren’t available
  5. Error handling with retry logic for response submission
The implementation focuses on the key components for watching for notifications and submitting responses, which can be integrated into any Kotlin Multiplatform application.
I