MqttClient

class MqttClient(config: MqttConfig, scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default))

MQTT 5.0 client for Kotlin Multiplatform.

MqttClient is the primary entry point for connecting to MQTT brokers, publishing messages, and receiving subscriptions. It wraps the internal connection state machine with a coroutine-friendly, Flow-based API.

Creating a client

Use the MqttClient factory function with a client ID and optional configuration block (similar to Ktor's HttpClient { } pattern):

val client = MqttClient("sensor-hub") {
keepAliveSeconds = 30
autoReconnect = true
}

Or construct directly with an MqttConfig:

val client = MqttClient(MqttConfig(clientId = "sensor-hub", keepAliveSeconds = 30))

Connecting to a broker

// TCP (JVM, Android, iOS, macOS, Linux, Windows)
client.connect(MqttEndpoint.Tcp("broker.example.com", port = 1883))

// TCP with TLS
client.connect(MqttEndpoint.Tcp("broker.example.com", port = 8883, tls = true))

// WebSocket (wasmJs / browser)
client.connect(MqttEndpoint.WebSocket("wss://broker.example.com/mqtt"))

// Parse from URI string
client.connect(MqttEndpoint.parse("ssl://broker.example.com:8883"))

Publishing messages

// String payload (convenience)
client.publish("sensors/temp", "22.5", qos = QoS.AT_LEAST_ONCE)

// Full control with MqttMessage
client.publish(MqttMessage(
topic = "sensors/data",
payload = ByteString(sensorBytes),
qos = QoS.EXACTLY_ONCE,
retain = true,
))

// With MQTT 5.0 properties
client.publish("commands/response", responseJson,
properties = PublishProperties(
contentType = "application/json",
messageExpiryInterval = 3600,
),
)

Default publish options

Set client-level defaults so you don't repeat QoS/retain on every call (similar to Ktor's defaultRequest {}):

val client = MqttClient("sensor") {
defaultQos = QoS.AT_LEAST_ONCE
defaultRetain = true
}
client.publish("sensors/temp", "22.5") // uses QoS 1 + retain

Subscribing and receiving messages

Messages arrive via the messages flow. Start collecting before subscribing to avoid missing messages:

// Collect all messages
launch { client.messages.collect { msg -> println("${msg.topic}: ${msg.payloadAsString()}") } }

// Filter by exact topic
launch { client.messagesForTopic("sensors/temp").collect { /* ... */} }

// Filter by wildcard pattern
launch { client.messagesMatching("sensors/+/temp").collect { /* ... */} }

// Subscribe with QoS
client.subscribe("sensors/#", QoS.AT_LEAST_ONCE)

// Bulk subscribe
client.subscribe(mapOf("sensors/#" to QoS.AT_LEAST_ONCE, "commands/#" to QoS.EXACTLY_ONCE))

Connection state

Observe lifecycle transitions via connectionState:

client.connectionState.collect { state ->
when (state) {
is ConnectionState.Connected -> println("Online")
is ConnectionState.Reconnecting -> println("Reconnecting...")
is ConnectionState.Disconnected -> println("Offline")
else -> { /* CONNECTING */}
}
}

Will messages

Configure a will message using the DSL block in the config builder:

val client = MqttClient("sensor") {
will {
topic = "sensors/status"
payload("offline")
qos = QoS.AT_LEAST_ONCE
retain = true
}
}

Logging

val client = MqttClient("sensor") {
logger = MqttLogger.println()
logLevel = MqttLogLevel.DEBUG
}

Automatic reconnection

Enabled by default. The client reconnects with exponential backoff and re-establishes subscriptions automatically:

val client = MqttClient("sensor") {
autoReconnect = true // default
reconnectBaseDelayMs = 1000 // initial delay
reconnectMaxDelayMs = 30000 // max backoff
}

Enhanced authentication (§4.12)

For SASL-style challenge/response flows:

val client = MqttClient("secure-client") {
authenticationMethod = "SCRAM-SHA-256"
authenticationData = ByteString(initialResponse)
}

launch { client.authChallenges.collect { challenge -> client.sendAuthResponse(processChallenge(challenge)) } }

Cleanup

Always close the client when done to release resources:

client.close()

Or use the scoped use extension for automatic cleanup:

MqttClient("sensor").use(MqttEndpoint.parse("tcp://broker:1883")) { client ->
client.subscribe("sensors/#", QoS.AT_LEAST_ONCE)
client.publish("sensors/temp", "22.5")
delay(10_000)
}

Thread safety

All public methods are suspend functions safe to call from any coroutine. Internal state is protected by mutexes — concurrent calls to publish, subscribe, and disconnect are serialized correctly.

Parameters

config

Client configuration mapping to CONNECT packet fields (§3.1).

scope

Coroutine scope for background jobs. Defaults to a new SupervisorJob on Dispatchers.Default. The client creates a child scope, so cancelling the provided scope also cancels all client operations.

See also

Constructors

Link copied to clipboard
constructor(config: MqttConfig, scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default))

Types

Link copied to clipboard
object Companion

Companion object — entry point for static helpers like probe (a one-shot connectivity diagnostic that does not require constructing a long-lived client).

Properties

Link copied to clipboard
val authChallenges: SharedFlow<AuthChallenge>

Flow of authentication challenges from the broker during enhanced auth (§4.12).

Link copied to clipboard

Observable connection lifecycle state.

Link copied to clipboard
val messages: SharedFlow<MqttMessage>

Flow of incoming MQTT messages from subscribed topics.

Functions

Link copied to clipboard
suspend fun close()

Close the client and release all resources.

Link copied to clipboard
suspend fun connect(endpoint: MqttEndpoint)

Connect to the MQTT broker at the specified endpoint.

Link copied to clipboard
suspend fun disconnect()

Gracefully disconnect from the broker.

Link copied to clipboard

Returns a Flow of messages filtered to a specific topic.

Link copied to clipboard

Returns a Flow of messages whose topic matches the given MQTT topicFilter.

Link copied to clipboard
suspend fun publish(message: MqttMessage)

Publish an MqttMessage to the broker.

suspend fun publish(topic: String, payload: String, qos: QoS? = null, retain: Boolean? = null, properties: PublishProperties = PublishProperties())

Publish a message with a String payload and optional MQTT 5.0 properties.

Link copied to clipboard
suspend fun sendAuthResponse(data: ByteString)

Send an AUTH response to the broker during enhanced authentication (§4.12).

Link copied to clipboard
suspend fun subscribe(subscriptions: List<Subscription>)

Subscribe to multiple topic filters with full subscription options.

suspend fun subscribe(topicFilters: Map<String, QoS>)

Subscribe to multiple topic filters with per-topic QoS levels.

suspend fun subscribe(topicFilter: String, qos: QoS = QoS.AT_MOST_ONCE, noLocal: Boolean = false, retainAsPublished: Boolean = false, retainHandling: RetainHandling = RetainHandling.SEND_AT_SUBSCRIBE)

Subscribe to a single topic filter with the specified qos and MQTT 5.0 subscription options.

Link copied to clipboard
suspend fun unsubscribe(vararg topicFilters: String)

Unsubscribe from one or more topic filters.

Link copied to clipboard
suspend fun <T> MqttClient.use(endpoint: MqttEndpoint, block: suspend (MqttClient) -> T): T

Connect, execute block, then close — structured resource management.