feat: Implement structured MQTT messages with separate username and message

This commit refactors the chat application to transmit MQTT messages using a structured JSON format.
The  struct, leveraging , now encapsulates both the username and the message content, allowing them to be sent and received as distinct fields.

Key changes include:
- Added  and  dependencies to .
- Defined a  struct with  and  fields.
- Modified message publishing to serialize  instances to JSON.
- Updated message receiving to deserialize JSON payloads into  instances.
- Adjusted internal message storage and display logic to handle the separate username and message components.
- Ensured the connect message also uses the new structured format.
This commit is contained in:
2025-07-14 15:21:11 -04:00
parent b0f8e535d2
commit 549070d3e9
3 changed files with 82 additions and 15 deletions

34
Cargo.lock generated
View File

@@ -617,6 +617,38 @@ dependencies = [
"libc",
]
[[package]]
name = "serde"
version = "1.0.219"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.219"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.140"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373"
dependencies = [
"itoa",
"memchr",
"ryu",
"serde",
]
[[package]]
name = "shlex"
version = "1.3.0"
@@ -820,6 +852,8 @@ dependencies = [
"rand",
"ratatui",
"rumqttc",
"serde",
"serde_json",
"tokio",
"tokio-stream",
]

View File

@@ -10,3 +10,5 @@ rumqttc = "0.24.0"
tokio = { version = "1.38.0", features = ["full"] }
tokio-stream = "0.1.15"
rand = "0.8.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@@ -9,17 +9,26 @@ use std::{io, time::Duration};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use rand::Rng;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct ChatMessage {
username: String,
message: String,
}
struct App {
messages: Vec<String>,
messages: Vec<(String, String)>,
input: String,
username: String,
}
impl App {
fn new(username: String) -> App {
let mut messages = Vec::new();
messages.push(("System".to_string(), format!("Welcome to the chat, {}!", username)));
App {
messages: Vec::new(),
messages,
input: String::new(),
username,
}
@@ -61,7 +70,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
async fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) -> io::Result<()> {
let (tx, mut rx) = mpsc::channel::<String>(100);
let (tx, mut rx) = mpsc::channel::<(String, String)>(100);
let mut mqttoptions = MqttOptions::new(app.username.clone(), "172.16.0.3", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
@@ -69,14 +78,32 @@ async fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) -> io::Re
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("chat/msg", QoS::AtMostOnce).await.unwrap();
// Publish connect message
let connect_message = ChatMessage {
username: app.username.clone(),
message: "has connected".to_string(),
};
let connect_payload = serde_json::to_string(&connect_message).unwrap();
client.publish("chat/msg", QoS::AtMostOnce, false, connect_payload.as_bytes()).await.unwrap();
let client_clone = client.clone();
tokio::spawn(async move {
while let Ok(notification) = eventloop.poll().await {
if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(p)) = notification {
let message = String::from_utf8_lossy(&p.payload).to_string();
if tx.send(message).await.is_err() {
break;
let message_str = String::from_utf8_lossy(&p.payload).to_string();
match serde_json::from_str::<ChatMessage>(&message_str) {
Ok(chat_msg) => {
if tx.send((chat_msg.username, chat_msg.message)).await.is_err() {
break;
}
}
Err(_) => {
// Handle malformed messages
if tx.send(("System".to_string(), message_str)).await.is_err() {
break;
}
}
}
}
}
@@ -92,15 +119,19 @@ async fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) -> io::Re
if let Event::Key(key) = event {
match key.code {
KeyCode::Enter => {
let message = app.input.drain(..).collect::<String>();
if message.starts_with("/nick ") {
let new_username = message.split_whitespace().nth(1).unwrap_or(&app.username).to_string();
let message_text = app.input.drain(..).collect::<String>();
if message_text.starts_with("/nick ") {
let new_username = message_text.split_whitespace().nth(1).unwrap_or(&app.username).to_string();
app.username = new_username;
app.messages.push(format!("Username changed to: {}", app.username));
app.messages.push(("System".to_string(), format!("Username changed to: {}", app.username)));
} else {
let formatted_message = format!("{}: {}", app.username, message);
let chat_message = ChatMessage {
username: app.username.clone(),
message: message_text,
};
let payload = serde_json::to_string(&chat_message).unwrap();
client_clone
.publish("chat/msg", QoS::AtMostOnce, false, formatted_message.as_bytes())
.publish("chat/msg", QoS::AtMostOnce, false, payload.as_bytes())
.await
.unwrap();
}
@@ -118,8 +149,8 @@ async fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) -> io::Re
}
}
}
Some(message) = rx.recv() => {
app.messages.push(message);
Some((username, message)) = rx.recv() => {
app.messages.push((username, message));
}
else => {
break;
@@ -140,7 +171,7 @@ fn ui(f: &mut Frame, app: &App) {
let messages: Vec<ListItem> = app
.messages
.iter()
.map(|m| ListItem::new(m.as_str()))
.map(|(username, message)| ListItem::new(format!("{}: {}", username, message)))
.collect();
let messages = List::new(messages)
.block(Block::default().borders(Borders::ALL).title("Messages"));