• Why Github?
  • Team
  • Enterprise
  • Explore
  • Marketplace
  • Pricing
Sign inSign up
Watch996
Star102.4k
Fork61.8k
Tag: rust
Switch branches/tags
K / 在 Rust 中用 Warp 创建异步 CRUD Web 服务.md
Go to Mobile Clone
加载中...
到移动设备上浏览
520 lines 40.64 KB
First commit on 18 Aug 2020

    翻译自:Create an async CRUD web service in Rust with warp

    转载请注明出处:http://www.telihai.com/archives/9161/

    在此博客的上一篇文章中,我们介绍了如何使用 Actix 和 Diesel 创建 Rust Web 服务。这次,我们将使用 warp Web 框架和 tokio-postgres 创建一个轻量级的,完全异步的 Web 服务。

    Warp 基于著名并久经考验的 hyper HTTP 库,它提供了强大而快速的基础。warp 的另一个很酷的功能是它的过滤器系统,这是一种构成 Web 应用程序的功能方法。

    本质上,过滤器只是可以组合在一起的功能。在扭曲中,它们用于从路由,中间件到将值传递给处理程序的所有过程。请参阅 warp 的发布帖子,以进行更深入的了解。在本教程中,您将看到一些实际使用的过滤器,并且我们将演示如何编写自己的过滤器。

    建立

    接下来,您所需要的只是一个相当新的 Rust 安装(1.39+)和一种运行 Postgres 数据库的方法(例如 Docker)。

    首先,创建您的测试项目。

    cargo new warp-example
    cd warp-example
    

    接下来,编辑 Cargo.toml 文件并添加所需的依赖项。

    [dependencies]
    tokio = { version = "0.2", features = ["macros"] }
    warp = "0.2"
    mobc-postgres = { version = "0.5", features = ["with-chrono-0_4"] }
    mobc = "0.5"
    serde = {version = "1.0", features = ["derive"] }
    serde_derive = "1.0"
    serde_json = "1.0"
    thiserror = "1.0"
    chrono = { version = "0.4", features = ["serde"] }
    

    如果您想知道所有这些意味着什么:

    • tokio 是我们的异步运行时,我们需要执行期货
    • warp 是我们的网络框架
    • mobc / mobc-postgres 代表数据库连接的异步连接池
    • serde 用于序列化和反序列化对象(例如 to/from JSON)
    • thiserror 是我们将用于错误处理的实用程序库
    • chrono 代表时间和日期实用程序

    为了避免将所有内容都转储到一个文件中,让我们在中添加一些结构 main.rs

    mod data;
    mod db;
    mod error;
    mod handler;
    

    对于每个模块,我们还将创建一个文件(例如 data.rs)。

    第一步,创建一个运行在端口 8000 上的 Web 服务器,其 /health 端点返回一个 200 OK

    main.rs 中添加:

    #[tokio::main]
    async fn main() {
        let health_route = warp::path!("health")
            .map(|| StatusCode::OK);
    
        let routes = health_route
            .with(warp::cors().allow_any_origin());
    
        warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
    }
    

    在上面的代码段中,我们定义了匹配 GET /health 并返回 200 OKhealth_route。然后,为演示如何添加中间件,使用 warp::cors 过滤器设置此路由,以允许从任何来源调用该服务。最后,用 warp::serve 运行服务。

    使用 cargo runcURL 启动并测试应用程序。

    curl http://localhost:8000/health
    

    将 Web 服务连接到数据库

    到目前为止,一切都很好!下一步是设置您的 Postgres 数据库,并在 /health handler 中添加对数据库连接的检查。

    要启动 Postgres 数据库,可以使用 Docker 或本地安装 Postgres。使用 Docker,您可以简单地执行:

    docker run -p 7878:5432 -d postgres:9.6.12
    

    此命令在 7878 端口用 postgres 用户名启动一个 Postgres 数据库 postgres,无密码。

    现在您有了一个正在运行的数据库,下一步是从 warp 应用程序与该数据库进行连接。为此,您可以使用异步连接池 mobc 产生多个数据库连接,并在请求之间重用它们。

    进行设置仅需几行。首先,在中定义一些便利类型 main.rs

    use mobc::{Connection, Pool};
    use mobc_postgres::{tokio_postgres, PgConnectionManager};
    use tokio_postgres::NoTls;
    
    type DBCon = Connection<PgConnectionManager<NoTls>>;
    type DBPool = Pool<PgConnectionManager<NoTls>>;
    

    下一步,在 db.rs 中创建连接池。

    use crate::{DBCon, DBPool};
    use mobc_postgres::{tokio_postgres, PgConnectionManager};
    use tokio_postgres::{Config, Error, NoTls};
    use std::fs;
    use std::str::FromStr;
    use std::time::Duration;
    
    const DB_POOL_MAX_OPEN: u64 = 32;
    const DB_POOL_MAX_IDLE: u64 = 8;
    const DB_POOL_TIMEOUT_SECONDS: u64 = 15;
    
    pub fn create_pool() -> std::result::Result<DBPool, mobc::Error<Error>> {
        let config =
            Config::from_str("postgres://postgres@127.0.0.1:7878/postgres")?;
    
        let manager = PgConnectionManager::new(config, NoTls);
        Ok(Pool::builder()
            .max_open(DB_POOL_MAX_OPEN)
            .max_idle(DB_POOL_MAX_IDLE)
            .get_timeout(Some(Duration::from_secs(DB_POOL_TIMEOUT_SECONDS)))
            .build(manager))
    }
    

    create_pool 函数仅创建一个 Postgres 连接字符串,并为连接池定义一些参数,例如最小和最大打开连接以及连接超时。

    下一步是简单地构建池并返回它。此时,实际上没有创建任何数据库连接,仅创建了池。

    由于我们已经在这里,所以我们还创建一个用于在启动时初始化数据库的函数。

    const INIT_SQL: &str = "./db.sql";
    
    pub async fn get_db_con(db_pool: &DBPool) -> Result<DBCon> {
        db_pool.get().await.map_err(DBPoolError)
    }
    
    pub async fn init_db(db_pool: &DBPool) -> Result<()> {
        let init_file = fs::read_to_string(INIT_SQL)?;
        let con = get_db_con(db_pool).await?;
        con.batch_execute(init_file.as_str()).await.map_err(DBInitError)?;
        Ok(())
    }
    

    使用该 get_db_con utility,我们尝试从池中获取新的数据库连接。现在不用担心错误——我们稍后将讨论错误处理。

    要在启动时从 db.sql 文件创建数据库表,要调用 init_db 函数。这会将文件读入字符串并执行 query。

    初始化查询如下所示:

    CREATE TABLE IF NOT EXISTS todo
    (
        id SERIAL PRIMARY KEY NOT NULL,
        name VARCHAR(255),
        created_at timestamp with time zone DEFAULT (now() at time zone 'utc'),
        checked boolean DEFAULT false
    );
    

    回到我们的主函数,现在我们可以调用数据库设置函数。

    let db_pool = db::create_pool().expect("database pool can be created");
    
    db::init_db(&db_pool).await.expect("database can be initialized");
    

    如果任何数据库设置代码失败,我们可以抛出 panic,因为继续下去没有意义。

    假设它没有失败,那么现在是时候解决本节的主要目标:向 /health handler 添加数据库检查。

    为此,我们需要一种将传递 db_pool 给 handler 的方法。这是写我们的第一个 warp filter 的绝好机会。

    main,rs 中,添加以下 with_db 过滤器。

    use std::convert::Infallible;
    use warp::{Filter, Rejection};
    
    fn with_db(
        db_pool: DBPool
    ) -> impl Filter<Extract = (DBPool,), Error = Infallible> + Clone {
        warp::any().map(move || db_pool.clone())
    }
    

    这是一个简单的提取过滤器。上面的意思是,对于任何 route(any()) ,您都希望提取一个 DBPool 并将其传递。

    如果您有兴趣了解有关过滤器的更多信息,文档 会很有帮助。

    然后使用 .and() 操作符将过滤器简单地添加到 handler 定义中:

    let health_route = warp::path!("health")
        .and(with_db(db_pool.clone()))
        .and_then(handler::health_handler);
    

    移动 health handler 到 handler.rs 文件并添加数据库检查。

    use crate::{db, DBPool};
    use warp::{http::StatusCode, reject, Reply, Rejection};
    
    pub async fn health_handler(
        db_pool: DBPool
    ) -> std::result::Result<impl Reply, Rejection> {
        let db = db::get_db_con(&db_pool)
                .await
                .map_err(|e| reject::custom(e))?;
    
        db.execute("SELECT 1", &[])
                .await
                .map_err(|e| reject::custom(DBQueryError(e)))?;
        Ok(StatusCode::OK)
    }
    

    现在,handler 收到一个 DBPool,您可以使用它来建立连接并对数据库发起健全性检查查询。

    如果检查期间发生错误,使用 reject::custom 来返回自定义错误。

    接下来,按照承诺,让我们看一下使用 warp 进行错误处理。

    处理错误

    干净的错误处理是任何 Web 应用程序中最重要且经常被忽视的事情之一。目的是在不泄漏内部细节的情况下为 API 使用者提供有用的错误信息。

    我们将使用 thiserror 库方便地创建自定义错误。

    error.rs 中定义一个 Error 枚举,该枚举具有所有错误的变体。

    use mobc_postgres::tokio_postgres;
    use thiserror::Error;
    
    #[derive(Error, Debug)]
    pub enum Error {
        #[error("error getting connection from DB pool: {0}")]
        DBPoolError(mobc::Error<tokio_postgres::Error>),
        #[error("error executing DB query: {0}")]
        DBQueryError(#[from] tokio_postgres::Error),
        #[error("error creating table: {0}")]
        DBInitError(tokio_postgres::Error),
        #[error("error reading file: {0}")]
        ReadFileError(#[from] std::io::Error),
    }
    

    如果我们可以找到将这些错误和其他错误转换为有意义的 API 响应的方法,则可以简单地从处理程序返回我们的自定义错误之一,而 caller 将自动获得正确的错误消息和状态代码。

    为此,我们将使用 warp 的概念 rejections

    首先,将便利类型添加到 main.rs for fallible results。

    type Result<T> = std::result::Result<T, warp::Rejection>;
    

    接下来,通过实现 Reject trait,确保通过 warp 将您的自定义错误识别为 rejections。

    impl warp::reject::Reject for Error {}
    

    定义拒绝处理程序,以以下形式将 rejections 转换为良好的错误响应。

    #[derive(Serialize)]
    struct ErrorResponse {
        message: String,
    }
    

    这样的拒绝处理程序可能看起来像这样:

    pub async fn handle_rejection(
        err: Rejection
    ) -> std::result::Result<impl Reply, Infallible> {
        let code;
        let message;
    
        if err.is_not_found() {
            code = StatusCode::NOT_FOUND;
            message = "Not Found";
        } else if let Some(_) =
            err.find::<warp::filters::body::BodyDeserializeError>()
        {
            code = StatusCode::BAD_REQUEST;
            message = "Invalid Body";
        } else if let Some(e) = err.find::<Error>() {
            match e {
                Error::DBQueryError(_) => {
                    code = StatusCode::BAD_REQUEST;
                    message = "Could not Execute request";
                }
                _ => {
                    eprintln!("unhandled application error: {:?}", err);
                    code = StatusCode::INTERNAL_SERVER_ERROR;
                    message = "Internal Server Error";
                }
            }
        } else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
            code = StatusCode::METHOD_NOT_ALLOWED;
            message = "Method Not Allowed";
        } else {
            eprintln!("unhandled error: {:?}", err);
            code = StatusCode::INTERNAL_SERVER_ERROR;
            message = "Internal Server Error";
        }
    
        let json = warp::reply::json(&ErrorResponse {
            message: message.into(),
        });
    
        Ok(warp::reply::with_status(json, code))
    }
    

    基本上,我们从处理程序那里得到一个 Rejection。然后,根据错误的类型,我们设置响应的消息和状态代码。

    如您所见,我们既可以处理一般错误,例如 not found,也可以处理特定问题,例如解析 request 的 JSON body 时遇到的错误。

    后备处理程序将一般 500 错误返回给用户并记录错误原因,因此您可以在必要时进行调查而不会泄漏内部信息。

    在路由定义中,只需使用 recover filter 添加这个 error handler:

    let routes = health_route
        .with(warp::cors().allow_any_origin())
        .recover(error::handle_rejection);
    

    完美!我们已经取得了很大的进步。剩下的就是为待办事项应用程序实际实现 CRUD 处理程序。

    实施 CRUD API

    现在,我们有一个运行并连接到数据库的 Web 服务器,以及一种优雅地处理错误的方法。我们的应用程序唯一缺少的是实际的应用程序逻辑。

    我们将实现四个处理程序:

    1. GET /todo/?search={searchString} 列出所有待办事项,并通过可选的搜索字符串过滤
    2. POST /todo/ 创建一个待办事项
    3. PUT /todo/{id} 用给定的 ID 更新待办事项
    4. DELETE /todo/{id} 删除具有给定 ID 的待办事项

    第一步是创建待办事项,因为如果没有待办事项,我们将无法方便地测试其他端点。

    db.rs 中,添加用于将待办事项插入数据库的功能。

    const TABLE: &str = "todo";
    
    pub async fn create_todo(db_pool: &DBPool, body: TodoRequest) -> Result<Todo> {
        let con = get_db_con(db_pool).await?;
        let query = format!("INSERT INTO {} (name) VALUES ($1) RETURNING *", TABLE);
        let row = con
            .query_one(query.as_str(), &[&body.name])
            .await
            .map_err(DBQueryError)?;
        Ok(row_to_todo(&row))
    }
    
    fn row_to_todo(row: &Row) -> Todo {
        let id: i32 = row.get(0);
        let name: String = row.get(1);
        let created_at: DateTime<Utc> = row.get(2);
        let checked: bool = row.get(3);
        Todo {
            id,
            name,
            created_at,
            checked,
        }
    }
    

    这将从连接池确立一个连接,发送插入查询,并将返回的行转换为 Todo

    为此,您需要一些在 data.rs 中定义一些数据对象。

    use chrono::prelude::*;
    use serde_derive::{Deserialize, Serialize};
    
    #[derive(Deserialize)]
    pub struct Todo {
        pub id: i32,
        pub name: String,
        pub created_at: DateTime<Utc>,
        pub checked: bool,
    }
    
    #[derive(Deserialize)]
    pub struct TodoRequest {
        pub name: String,
    }
    
    #[derive(Deserialize)]
    pub struct TodoUpdateRequest {
        pub name: String,
        pub checked: bool,
    }
    
    #[derive(Serialize)]
    pub struct TodoResponse {
        pub id: i32,
        pub name: String,
        pub checked: bool,
    }
    
    impl TodoResponse {
        pub fn of(todo: Todo) -> TodoResponse {
            TodoResponse {
                id: todo.id,
                name: todo.name,
                checked: todo.checked,
            }
        }
    }
    

    Todo 结构实质上是数据库表的镜像。tokio-postgres 可以使用 chrono 的 DateTime<Utc> 来映射转换时间戳。其他结构是您期望用于创建和更新待办事项的 JSON 请求,以及您在 list、update 和 create 处理程序中发送回的响应。

    现在,您可以在 handler.rs 中创建实际的创建处理程序。

    pub async fn create_todo_handler(
        body: TodoRequest, db_pool: DBPool
    ) -> Result<impl Reply> {
        Ok(json(&TodoResponse::of(
            db::create_todo(&db_pool, body).await.map_err(|e| reject::custom(e))?,
        )))
    }
    

    在这种情况下,您会同时得到传递给 handler 的一个从 request body 解析的 TodoRequestdb_pool。到达那里后,只需调用数据库函数,将其映射到 TodoResponse,然后使用 warp 的 reply::json 助手将其序列化为 JSON。

    如果发生错误,请使用warp's处理它,这使您可以根据我们的自定义错误类型创建拒绝项。reject::custom

    唯一缺少的是 main.rs 中的路由定义。

    let todo = warp::path("todo");
    let todo_routes = todo
        .and(warp::post())
        .and(warp::body::json())
        .and(with_db(db_pool.clone()))
        .and_then(handler::create_todo_handler));
    
    let routes = health_route
        .or(todo_routes)
        .with(warp::cors().allow_any_origin())
        .recover(error::handle_rejection);
    

    You’ll use warp::path at /todo/ for several routes. 然后,使用 warp 的过滤器,组成您的创建处理程序。

    添加 post 方法,明确说明您需要一个 JSON body,然后使用 with_db 过滤器表示需要数据库访问权限。最后,通过告诉路由使用哪个处理程序来完成该操作。

    然后,所有这些都将与 or 操作符一起传递到路由。

    使用以下命令对其进行测试。

    curl -X POST 'http://localhost:8000/todo/' -H 'Content-Type: application/json' -d '{"name": "Some Todo"}'
    
    {"id":1,"name":"Some Todo","checked":false}
    

    太棒了!现在您知道了它的工作原理,您可以一次完成其他三个处理程序。同样,首先添加数据库助手。

    pub async fn fetch_todos(
        db_pool: &DBPool, search: Option<String>
    ) -> Result<Vec<Todo>> {
        let con = get_db_con(db_pool).await?;
        let where_clause = match search {
            Some(_) => "WHERE name like $1",
            None => "",
        };
        let query = format!(
            "SELECT {} FROM {} {} ORDER BY created_at DESC",
            SELECT_FIELDS, TABLE, where_clause
        );
        let q = match search {
            Some(v) => con.query(query.as_str(), &[&v]).await,
            None => con.query(query.as_str(), &[]).await,
        };
        let rows = q.map_err(DBQueryError)?;
    
        Ok(rows.iter().map(|r| row_to_todo(&r)).collect())
    }
    
    pub async fn update_todo(
        db_pool: &DBPool,
        id: i32,
        body: TodoUpdateRequest
    ) -> Result<Todo> {
        let con = get_db_con(db_pool).await?;
        let query = format!(
            "UPDATE {} SET name = $1, checked = $2 WHERE id = $3 RETURNING *",
            TABLE
        );
        let row = con
            .query_one(query.as_str(), &[&body.name, &body.checked, &id])
            .await
            .map_err(DBQueryError)?;
        Ok(row_to_todo(&row))
    }
    
    pub async fn delete_todo(db_pool: &DBPool, id: i32) -> Result<u64> {
        let con = get_db_con(db_pool).await?;
        let query = format!("DELETE FROM {} WHERE id = $1", TABLE);
        con.execute(query.as_str(), &[&id]).await.map_err(DBQueryError)
    }
    

    这些基本与 create 情况相同,不同之处在于 fetch_todos,如果有搜索词,您将创建一个不同的查询。

    接下来让我们看一下处理程序。

    #[derive(Deserialize)]
    pub struct SearchQuery {
        search: Option<String>,
    }
    
    pub async fn list_todos_handler(
        query: SearchQuery,
        db_pool: DBPool
    ) -> Result<impl Reply> {
        let todos = db::fetch_todos(&db_pool, query.search)
            .await
            .map_err(|e| reject::custom(e))?;
        Ok(json::<Vec<_>>(
            &todos.into_iter().map(|t| TodoResponse::of(t)).collect(),
        ))
    }
    
    pub async fn update_todo_handler(
        id: i32,
        body: TodoUpdateRequest,
        db_pool: DBPool,
    ) -> Result<impl Reply> {
        Ok(json(&TodoResponse::of(
            db::update_todo(&db_pool, id, body)
                .await
                .map_err(|e| reject::custom(e))?,
        )))
    }
    
    pub async fn delete_todo_handler(
        id: i32,
        db_pool: DBPool
    ) -> Result<impl Reply> {
        db::delete_todo(&db_pool, id).await.map_err(|e| reject::custom(e))?;
        Ok(StatusCode::OK)
    }
    

    同样,您会看到一些熟悉的东西。如果一切顺利,每个处理程序都会调用数据库层,处理错误,并为调用方创建一个返回值。

    一个有趣的例外是 list_todos_handler,前面提到的查询参数已传入,已经解析为 SearchQuery

    这就是您处理 warp 中的查询参数的方式。如果您有更多具有不同类型的参数,则只需将它们添加到 SearchQuery 结构中,它们就会被自动解析。

    让我们进行所有连接,然后进行一项最终测试。

    let todo_routes = todo
        .and(warp::get())
        .and(warp::query())
        .and(with_db(db_pool.clone()))
        .and_then(handler::list_todos_handler)
        .or(todo
            .and(warp::post())
            .and(warp::body::json())
            .and(with_db(db_pool.clone()))
            .and_then(handler::create_todo_handler))
        .or(todo
            .and(warp::put())
            .and(warp::path::param())
            .and(warp::body::json())
            .and(with_db(db_pool.clone()))
            .and_then(handler::update_todo_handler))
        .or(todo
            .and(warp::delete())
            .and(warp::path::param())
            .and(with_db(db_pool.clone()))
            .and_then(handler::delete_todo_handler));
    

    这里有一些新东西。要将查询参数获取到 list handler,您需要使用 warp::query()。要获取 id 参数用于 update 和 delete,请使用 warp::path::param()。使用 or 将不同的路由合并,即可设置 todo 路由。

    在 warp 中创建和构造路由的方法有很多。它只是被组合在一起的函数,因此处理过程非常灵活。有关更多示例,请查看官方文档。

    现在让我们测试整个事情。

    首先,查看错误处理是否真正起作用。

    curl -v -X POST 'http://localhost:8000/todo/' -H 'Content-Type: application/json' -d '{"wrong": "Some Todo"}'
    
    HTTP/1.1 400 Bad Request
    {"message":"Invalid Body"}
    

    接下来,添加另一个 Todo,立即将其 check,然后尝试更新不存在的 Todo

    curl -X POST 'http://localhost:8000/todo/' -H 'Content-Type: application/json' -d '{"name": "Done Todo"}'
    
    {"id":2,"name":"Done Todo","checked":false}
    
    
    curl -X PUT 'http://localhost:8000/todo/2' -H 'Content-Type: application/json' -d '{"name": "Done Todo", "checked": true}'
    
    {"id":2,"name":"Done Todo","checked":true}
    
    
    curl -X PUT 'http://localhost:8000/todo/2000' -H 'Content-Type: application/json' -d '{"name": "Done Todo", "checked": true}'
    
    {"message":"Could not Execute request"}
    

    到目前为止,一切都很好!现在列出它们,过滤列表,删除其中之一,然后再次列出。

    curl -X GET 'http://localhost:8000/todo/' -H 'Content-Type: application/json'
    
    [{"id":1,"name":"Some Todo","checked":false},{"id":2,"name":"Done Todo","checked":true}]
    
    
    curl -X GET 'http://localhost:8000/todo/?search=Done%20Todo' -H 'Content-Type: application/json'
    
    [{"id":2,"name":"Done Todo","checked":true}]
    
    
    curl -v -X DELETE 'http://localhost:8000/todo/2' -H 'Content-Type: application/json'
    
    HTTP/1.1 200 OK
    
    
    curl -X GET 'http://localhost:8000/todo/' -H 'Content-Type: application/json'
    
    [{"id":1,"name":"Some Todo","checked":false}]
    

    完善!一切正常。您可以在 GitHub 上找到此示例的完整代码。

    结论

    太好了!在本教程中,我们演示了如何使用 warp 和 tokio-postgres 创建完全异步的 Web 应用程序。我们设法在 300 行以下的 Rust 代码中对错误进行了错误处理,从而获得了基于数据库的基本 CRUD Web 服务。不是太糟糕!

    warp 似乎很有前途;它轻巧,现代,快速,我喜欢功能方法。该框架还很年轻,还没有经受住时间的考验,但是到目前为止,它看起来很棒。