Mqtt Client
MQTT 5.0 client for Kotlin Multiplatform.
MqttClient is the primary entry point for connecting to MQTT brokers, publishing messages, and receiving subscriptions. It wraps the internal connection state machine with a coroutine-friendly, Flow-based API.
Creating a client
Use the MqttClient factory function with a client ID and optional configuration block (similar to Ktor's HttpClient { } pattern):
val client = MqttClient("sensor-hub") {
keepAliveSeconds = 30
autoReconnect = true
}Or construct directly with an MqttConfig:
val client = MqttClient(MqttConfig(clientId = "sensor-hub", keepAliveSeconds = 30))Connecting to a broker
// TCP (JVM, Android, iOS, macOS, Linux, Windows)
client.connect(MqttEndpoint.Tcp("broker.example.com", port = 1883))
// TCP with TLS
client.connect(MqttEndpoint.Tcp("broker.example.com", port = 8883, tls = true))
// WebSocket (wasmJs / browser)
client.connect(MqttEndpoint.WebSocket("wss://broker.example.com/mqtt"))
// Parse from URI string
client.connect(MqttEndpoint.parse("ssl://broker.example.com:8883"))Publishing messages
// String payload (convenience)
client.publish("sensors/temp", "22.5", qos = QoS.AT_LEAST_ONCE)
// Full control with MqttMessage
client.publish(MqttMessage(
topic = "sensors/data",
payload = ByteString(sensorBytes),
qos = QoS.EXACTLY_ONCE,
retain = true,
))
// With MQTT 5.0 properties
client.publish("commands/response", responseJson,
properties = PublishProperties(
contentType = "application/json",
messageExpiryInterval = 3600,
),
)Default publish options
Set client-level defaults so you don't repeat QoS/retain on every call (similar to Ktor's defaultRequest {}):
val client = MqttClient("sensor") {
defaultQos = QoS.AT_LEAST_ONCE
defaultRetain = true
}
client.publish("sensors/temp", "22.5") // uses QoS 1 + retainSubscribing and receiving messages
Messages arrive via the messages flow. Start collecting before subscribing to avoid missing messages:
// Collect all messages
launch { client.messages.collect { msg -> println("${msg.topic}: ${msg.payloadAsString()}") } }
// Filter by exact topic
launch { client.messagesForTopic("sensors/temp").collect { /* ... */} }
// Filter by wildcard pattern
launch { client.messagesMatching("sensors/+/temp").collect { /* ... */} }
// Subscribe with QoS
client.subscribe("sensors/#", QoS.AT_LEAST_ONCE)
// Bulk subscribe
client.subscribe(mapOf("sensors/#" to QoS.AT_LEAST_ONCE, "commands/#" to QoS.EXACTLY_ONCE))Connection state
Observe lifecycle transitions via connectionState:
client.connectionState.collect { state ->
when (state) {
is ConnectionState.Connected -> println("Online")
is ConnectionState.Reconnecting -> println("Reconnecting...")
is ConnectionState.Disconnected -> println("Offline")
else -> { /* CONNECTING */}
}
}Will messages
Configure a will message using the DSL block in the config builder:
val client = MqttClient("sensor") {
will {
topic = "sensors/status"
payload("offline")
qos = QoS.AT_LEAST_ONCE
retain = true
}
}Logging
val client = MqttClient("sensor") {
logger = MqttLogger.println()
logLevel = MqttLogLevel.DEBUG
}Automatic reconnection
Enabled by default. The client reconnects with exponential backoff and re-establishes subscriptions automatically:
val client = MqttClient("sensor") {
autoReconnect = true // default
reconnectBaseDelayMs = 1000 // initial delay
reconnectMaxDelayMs = 30000 // max backoff
}Enhanced authentication (§4.12)
For SASL-style challenge/response flows:
val client = MqttClient("secure-client") {
authenticationMethod = "SCRAM-SHA-256"
authenticationData = ByteString(initialResponse)
}
launch { client.authChallenges.collect { challenge -> client.sendAuthResponse(processChallenge(challenge)) } }Cleanup
Always close the client when done to release resources:
client.close()Or use the scoped use extension for automatic cleanup:
MqttClient("sensor").use(MqttEndpoint.parse("tcp://broker:1883")) { client ->
client.subscribe("sensors/#", QoS.AT_LEAST_ONCE)
client.publish("sensors/temp", "22.5")
delay(10_000)
}Thread safety
All public methods are suspend functions safe to call from any coroutine. Internal state is protected by mutexes — concurrent calls to publish, subscribe, and disconnect are serialized correctly.
Parameters
Client configuration mapping to CONNECT packet fields (§3.1).
Coroutine scope for background jobs. Defaults to a new SupervisorJob on Dispatchers.Default. The client creates a child scope, so cancelling the provided scope also cancels all client operations.
See also
Constructors
Types
Properties
Functions
Connect to the MQTT broker at the specified endpoint.
Gracefully disconnect from the broker.
Returns a Flow of messages filtered to a specific topic.
Returns a Flow of messages whose topic matches the given MQTT topicFilter.
Publish an MqttMessage to the broker.
Publish a message with a String payload and optional MQTT 5.0 properties.
Send an AUTH response to the broker during enhanced authentication (§4.12).
Subscribe to multiple topic filters with full subscription options.
Subscribe to multiple topic filters with per-topic QoS levels.
Subscribe to a single topic filter with the specified qos and MQTT 5.0 subscription options.
Unsubscribe from one or more topic filters.
Connect, execute block, then close — structured resource management.