Skip to content

A Tokio-based MQTT communication library built on top of mqtt-protocol-core. It supports both client and server (broker) implementations.

License

Notifications You must be signed in to change notification settings

redboltz/mqtt-endpoint-tokio

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

73 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

mqtt-endpoint-tokio

Crates.io Documentation CI codecov License: MIT

A high-performance async MQTT client/server library for Rust with tokio, supporting MQTT v5.0 and v3.1.1 with TCP, TLS, WebSocket, and QUIC transports.

Features

  • Full MQTT Protocol Support: MQTT v5.0 and v3.1.1 compatible
  • Multiple Transport Layers: TCP, TLS, WebSocket, and QUIC support
  • Async/Await: Built on tokio for high-performance async I/O
  • Generic Packet ID Support: Supports both u16 and u32 packet IDs for broker clustering
  • Sans-I/O Protocol Core: Uses mqtt-protocol-core for protocol implementation
  • Type Safety: Comprehensive type system for MQTT packet handling
  • Connection Management: Robust connection lifecycle management
  • Error Handling: Comprehensive error types for all failure modes

Transport Support

TCP Transport

Basic TCP connections for standard MQTT communication.

TLS Transport

Secure connections with full TLS support using rustls.

WebSocket Transport

WebSocket connections for web-based MQTT clients, supporting both:

  • Plain WebSocket (ws://)
  • Secure WebSocket over TLS (wss://)

QUIC Transport

High-performance QUIC connections for low-latency MQTT communication using quinn.

Unix Domain Socket Transport

Local inter-process communication on Unix-based systems (Linux, macOS, BSD). Ideal for scenarios where the client and broker run on the same machine, offering better performance than TCP/IP sockets.

Quick Start

Add this to your Cargo.toml:

[dependencies]
mqtt-endpoint-tokio = "0.6"

Optional Features

The library provides optional features to reduce dependencies based on your transport needs:

# All transports enabled (default)
mqtt-endpoint-tokio = "0.6"

# Only TCP transport (minimal dependencies)
mqtt-endpoint-tokio = { version = "0.6", default-features = false }

# TCP + TLS
mqtt-endpoint-tokio = { version = "0.6", default-features = false, features = ["tls"] }

# TCP + WebSocket
mqtt-endpoint-tokio = { version = "0.6", default-features = false, features = ["ws"] }

# TCP + QUIC (includes TLS)
mqtt-endpoint-tokio = { version = "0.6", default-features = false, features = ["quic"] }

# Custom combination
mqtt-endpoint-tokio = { version = "0.6", default-features = false, features = ["tls", "ws"] }

Available Features:

  • tls - TLS transport support (enabled by default)
  • ws - WebSocket transport support (enabled by default)
  • quic - QUIC transport support (enabled by default, requires tls)
  • unix-socket - Unix domain socket transport support (enabled by default, Unix systems only)
  • tracing - Enable tracing support for debugging

Default features: ["tls", "ws", "quic", "unix-socket"]

Basic Client Example

use mqtt_endpoint_tokio::mqtt_ep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint
    let endpoint = mqtt_ep::endpoint::Endpoint::new(mqtt_ep::Version::V5_0);

    // Connect to broker
    let tcp_stream = mqtt_ep::transport::connect_helper::connect_tcp("127.0.0.1:1883", None).await?;
    let transport = mqtt_ep::transport::TcpTransport::from_stream(tcp_stream);
    endpoint.attach(transport, mqtt_ep::endpoint::Mode::Client).await?;

    // Send CONNECT packet
    let connect = mqtt_ep::packet::v5_0::Connect::builder()
        .client_id("rust-client")
        .unwrap()
        .keep_alive(60)
        .clean_start(true)
        .build()
        .unwrap();

    endpoint.send(connect).await?;

    // Receive CONNACK
    let packet = endpoint.recv().await?;
    println!("Received: {packet:?}");

    Ok(())
}

WebSocket Example

use mqtt_endpoint_tokio::mqtt_ep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect via WebSocket
    let ws_stream = mqtt_ep::transport::connect_helper::connect_tcp_ws(
        "127.0.0.1:8080",
        "/mqtt",
        None,
        None
    ).await?;

    let transport = mqtt_ep::transport::WebSocketTransport::from_tcp_client_stream(ws_stream.into_inner());
    // ... rest of MQTT communication

    Ok(())
}

TLS Example

use mqtt_endpoint_tokio::mqtt_ep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect via TLS
    let tls_stream = mqtt_ep::transport::connect_helper::connect_tcp_tls(
        "broker.example.com:8883",
        "broker.example.com",
        None,
        None
    ).await?;

    let transport = mqtt_ep::transport::TlsTransport::from_stream(tls_stream);
    // ... rest of MQTT communication

    Ok(())
}

QUIC Example

use mqtt_endpoint_tokio::mqtt_ep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect via QUIC
    let (send_stream, recv_stream) = mqtt_ep::transport::connect_helper::connect_quic(
        "127.0.0.1:14567",
        "localhost",
        None,
        None,
        None
    ).await?;

    let transport = mqtt_ep::transport::QuicTransport::from_streams(send_stream, recv_stream);
    // ... rest of MQTT communication

    Ok(())
}

Unix Domain Socket Example (Unix systems only)

use mqtt_endpoint_tokio::mqtt_ep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect via Unix domain socket
    let unix_stream = mqtt_ep::transport::connect_helper::connect_unix(
        "/tmp/mqtt.sock",
        None
    ).await?;

    let transport = mqtt_ep::transport::UnixStreamTransport::from_stream(unix_stream);
    // ... rest of MQTT communication

    Ok(())
}

Architecture

This library is built on top of mqtt-protocol-core, which provides the Sans-I/O MQTT protocol implementation. The mqtt-endpoint-tokio layer adds:

  • Tokio-based async I/O
  • Transport layer abstractions
  • Connection management
  • Error handling integration

Generic Packet ID Support

The library supports both standard u16 packet IDs and extended u32 packet IDs for broker clustering scenarios:

use mqtt_endpoint_tokio::mqtt_ep;

// Standard u16 packet IDs
type StandardEndpoint = mqtt_ep::endpoint::Endpoint<mqtt_ep::role::Client>;

// Extended u32 packet IDs for broker clustering
type ExtendedEndpoint = mqtt_ep::endpoint::GenericEndpoint<mqtt_ep::role::Client, u32>;

Error Handling

Comprehensive error types provide detailed information about failures:

use mqtt_endpoint_tokio::mqtt_ep;

match endpoint.send(packet).await {
    Ok(()) => println!("Packet sent"),
    Err(mqtt_ep::connection_error::ConnectionError::NotConnected) => println!("Need to connect first"),
    Err(mqtt_ep::connection_error::ConnectionError::Transport(e)) => println!("Network error: {e}"),
    Err(mqtt_ep::connection_error::ConnectionError::Mqtt(e)) => println!("Protocol error: {e:?}"),
    Err(e) => println!("Other error: {e}"),
}

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

About

A Tokio-based MQTT communication library built on top of mqtt-protocol-core. It supports both client and server (broker) implementations.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •