85·Web 开发高级

WebSocket 实时通信

WebSocket 实时通信

学习目标

  1. 掌握 WebSocket 连接
  2. 理解消息广播
  3. 实现聊天室

核心概念

基本 WebSocket

[dependencies]
axum = { version = "0.7", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
use axum::{
    extract::ws::{Message, WebSocket, WebSocketUpgrade},
    response::IntoResponse,
    Router, routing::get,
};

async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.on_upgrade(handle_socket)
}

async fn handle_socket(mut socket: WebSocket) {
    while let Some(Ok(msg)) = socket.recv().await {
        match msg {
            Message::Text(text) => {
                println!("收到: {}", text);
                socket.send(Message::Text(format!("Echo: {}", text).into())).await.unwrap();
            }
            Message::Close(_) => break,
            _ => {}
        }
    }
}

fn app() -> Router {
    Router::new().route("/ws", get(ws_handler))
}

广播聊天室

use axum::{
    extract::ws::{Message, WebSocket, WebSocketUpgrade},
    extract::State,
    response::IntoResponse,
    Router, routing::get,
};
use tokio::sync::broadcast;
use futures::{SinkExt, StreamExt};

struct AppState {
    tx: broadcast::Sender<String>,
}

async fn ws_handler(
    ws: WebSocketUpgrade,
    State(state): State<std::sync::Arc<AppState>>,
) -> impl IntoResponse {
    ws.on_upgrade(|socket| handle_socket(socket, state))
}

async fn handle_socket(mut socket: WebSocket, state: std::sync::Arc<AppState>) {
    let mut rx = state.tx.subscribe();

    let (mut sender, mut receiver) = socket.split();

    // 接收消息并广播
    let tx = state.tx.clone();
    let mut recv_task = tokio::spawn(async move {
        while let Some(Ok(Message::Text(text))) = receiver.next().await {
            let _ = tx.send(text.to_string());
        }
    });

    // 发送广播消息给客户端
    let mut send_task = tokio::spawn(async move {
        while let Ok(msg) = rx.recv().await {
            if sender.send(Message::Text(msg.into())).await.is_err() {
                break;
            }
        }
    });

    tokio::select! {
        _ = &mut recv_task => send_task.abort(),
        _ = &mut send_task => recv_task.abort(),
    }
}

#[tokio::main]
async fn main() {
    let (tx, _) = broadcast::channel(100);
    let state = std::sync::Arc::new(AppState { tx });

    let app = Router::new()
        .route("/ws", get(ws_handler))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

小结

操作API
升级连接WebSocketUpgrade::on_upgrade
接收消息socket.recv().await
发送消息socket.send(Message::Text(...)).await
广播broadcast::channel

练习编辑器

rust
Loading...