MqttClient

class MqttClient(config: MqttConfig, scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default))

MQTT 5.0 client for Kotlin Multiplatform.

MqttClient is the primary entry point for connecting to MQTT brokers, publishing messages, and receiving subscriptions. It wraps the internal connection state machine with a coroutine-friendly, Flow-based API.

Creating a client

Use the MqttClient factory function with a client ID and optional configuration block (similar to Ktor's HttpClient { } pattern):

val client = MqttClient("sensor-hub") {
keepAliveSeconds = 30
autoReconnect = true
}

Or construct directly with an MqttConfig:

val client = MqttClient(MqttConfig(clientId = "sensor-hub", keepAliveSeconds = 30))

Connecting to a broker

// TCP (JVM, Android, iOS, macOS, Linux, Windows)
client.connect(MqttEndpoint.Tcp("broker.example.com", port = 1883))

// TCP with TLS
client.connect(MqttEndpoint.Tcp("broker.example.com", port = 8883, tls = true))

// WebSocket (wasmJs / browser)
client.connect(MqttEndpoint.WebSocket("wss://broker.example.com/mqtt"))

// Parse from URI string
client.connect(MqttEndpoint.parse("ssl://broker.example.com:8883"))

Publishing messages

// String payload (convenience)
client.publish("sensors/temp", "22.5", qos = QoS.AT_LEAST_ONCE)

// Full control with MqttMessage
client.publish(MqttMessage(
topic = "sensors/data",
payload = ByteString(sensorBytes),
qos = QoS.EXACTLY_ONCE,
retain = true,
))

// With MQTT 5.0 properties
client.publish("commands/response", responseJson,
properties = PublishProperties(
contentType = "application/json",
messageExpiryInterval = 3600,
),
)

Default publish options

Set client-level defaults so you don't repeat QoS/retain on every call (similar to Ktor's defaultRequest {}):

val client = MqttClient("sensor") {
defaultQos = QoS.AT_LEAST_ONCE
defaultRetain = true
}
client.publish("sensors/temp", "22.5") // uses QoS 1 + retain

Subscribing and receiving messages

Messages arrive via the messages flow. Start collecting before subscribing to avoid missing messages:

// Collect all messages
launch { client.messages.collect { msg -> println("${msg.topic}: ${msg.payloadAsString()}") } }

// Filter by exact topic
launch { client.messagesForTopic("sensors/temp").collect { /* ... */} }

// Filter by wildcard pattern
launch { client.messagesMatching("sensors/+/temp").collect { /* ... */} }

// Subscribe with QoS
client.subscribe("sensors/#", QoS.AT_LEAST_ONCE)

// Bulk subscribe
client.subscribe(mapOf("sensors/#" to QoS.AT_LEAST_ONCE, "commands/#" to QoS.EXACTLY_ONCE))

Connection state

Observe lifecycle transitions via connectionState:

client.connectionState.collect { state ->
when (state) {
is ConnectionState.Connected -> println("Online")
is ConnectionState.Reconnecting -> println("Reconnecting...")
is ConnectionState.Disconnected -> println("Offline")
else -> { /* CONNECTING */}
}
}

Will messages

Configure a will message using the DSL block in the config builder:

val client = MqttClient("sensor") {
will {
topic = "sensors/status"
payload("offline")
qos = QoS.AT_LEAST_ONCE
retain = true
}
}

Logging

val client = MqttClient("sensor") {
logger = MqttLogger.println()
logLevel = MqttLogLevel.DEBUG
}

Automatic reconnection

Enabled by default. The client reconnects with exponential backoff and re-establishes subscriptions automatically:

val client = MqttClient("sensor") {
autoReconnect = true // default
reconnectBaseDelayMs = 1000 // initial delay
reconnectMaxDelayMs = 30000 // max backoff
}

Enhanced authentication (§4.12)

For SASL-style challenge/response flows:

val client = MqttClient("secure-client") {
authenticationMethod = "SCRAM-SHA-256"
authenticationData = ByteString(initialResponse)
}

launch { client.authChallenges.collect { challenge -> client.sendAuthResponse(processChallenge(challenge)) } }

Cleanup

Always close the client when done to release resources:

client.close()

Or use the scoped use extension for automatic cleanup:

MqttClient("sensor").use(MqttEndpoint.parse("tcp://broker:1883")) { client ->
client.subscribe("sensors/#", QoS.AT_LEAST_ONCE)
client.publish("sensors/temp", "22.5")
delay(10_000)
}

Thread safety

All public methods are suspend functions safe to call from any coroutine. Internal state is protected by mutexes — concurrent calls to publish, subscribe, and disconnect are serialized correctly.

Parameters

config

Client configuration mapping to CONNECT packet fields (§3.1).

scope

Coroutine scope for background jobs. Defaults to a new SupervisorJob on Dispatchers.Default. The client creates a child scope, so cancelling the provided scope also cancels all client operations.

See also

Constructors

Link copied to clipboard
constructor(config: MqttConfig, scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default))

Types

Link copied to clipboard
object Companion

Companion object — entry point for static helpers like probe (a one-shot connectivity diagnostic that does not require constructing a long-lived client).

Properties

Link copied to clipboard
val authChallenges: SharedFlow<AuthChallenge>

Flow of authentication challenges from the broker during enhanced auth (§4.12).

Link copied to clipboard

Observable connection lifecycle state.

Link copied to clipboard
val messages: SharedFlow<MqttMessage>

Flow of incoming MQTT messages from subscribed topics.

Link copied to clipboard

The protocol version actually in use after connection negotiation.

Functions

Link copied to clipboard
suspend fun close()

Close the client and release all resources.

Link copied to clipboard
suspend fun connect(endpoint: MqttEndpoint)

Connect to the MQTT broker at the specified endpoint.

Link copied to clipboard
suspend fun disconnect()

Gracefully disconnect from the broker.

Link copied to clipboard

Returns a Flow of messages filtered to a specific topic.

Link copied to clipboard

Returns a Flow of messages whose topic matches the given MQTT topicFilter.

Link copied to clipboard
suspend fun publish(message: MqttMessage)

Publish an MqttMessage to the broker.

suspend fun publish(topic: String, payload: String, qos: QoS? = null, retain: Boolean? = null, properties: PublishProperties = PublishProperties())

Publish a message with a String payload and optional MQTT 5.0 properties.

Link copied to clipboard
suspend fun sendAuthResponse(data: ByteString)

Send an AUTH response to the broker during enhanced authentication (§4.12).

Link copied to clipboard
suspend fun subscribe(subscriptions: List<Subscription>)

Subscribe to multiple topic filters with full subscription options.

suspend fun subscribe(topicFilters: Map<String, QoS>)

Subscribe to multiple topic filters with per-topic QoS levels.

suspend fun subscribe(topicFilter: String, qos: QoS = QoS.AT_MOST_ONCE, noLocal: Boolean = false, retainAsPublished: Boolean = false, retainHandling: RetainHandling = RetainHandling.SEND_AT_SUBSCRIBE)

Subscribe to a single topic filter with the specified qos and MQTT 5.0 subscription options.

Link copied to clipboard
suspend fun unsubscribe(vararg topicFilters: String)

Unsubscribe from one or more topic filters.

Link copied to clipboard
suspend fun <T> MqttClient.use(endpoint: MqttEndpoint, block: suspend (MqttClient) -> T): T

Connect, execute block, then close — structured resource management.