阿里云服务器web攻击 Tokio可以说是rust中最热门的库,对于异步与并发进行了很好的支持。大多数基于rust的开源框架都使用到了Tokio,因此在介绍这些实现开源框架时经常会被···
阿里云服务器web攻击
Tokio可以说是rust中最热门的库,对于异步与并发进行了很好的支持。大多数基于rust的开源框架都使用到了Tokio,因此在介绍这些实现开源框架时经常会被问到:底层的异步和并发是怎么实现的?我只能回答:底层的异步和并发都是由Tokio控制的。这显然不是一个令人满意的回答。因此本文章将对于Tokio的基本方法和底层逻辑进行分析。
概述
一句话概括
Tokio 可以理解成一个任务池和一个调度器,负责把所有在任务池中的任务调度运行起来。
更具体一点,Tokio 可以类比为一个异步操作系统:
Tokio 的优势主要体现在以下方面:
高效通过内部的优化机制(调度算法、无锁队列与内存池管理等)与 rust 的语言优势,Tokio 效率较高,在早期的实验中,官方给出了性能对比图:
通用在 rust 发展之初,社区出现了很多运行时库,但是,大浪淘沙,随着时间的流逝,Tokio 越来越亮眼,无论是性能、功能还是社区、文档,它在各个方面都异常优秀,时至今日,可以说已成为事实上的标准。新出现的rust运行时库(例如 Bytedance 的 monoio)宣传性能优于 Tokio,但还是雷声大雨点小,没有被广泛应用。
Tokio 的身影遍布在各种类型的 rust 库中,例如HTTP库(Hyper)、Web框架(Axum / Warp)、gRPC(Tonic)、TLS库(Rustls)、数据库支持(SeaORM)等。同时各大厂商也广泛使用,例如AWS、Azure、Google等。
Tokio 更加适合频繁切换的场景,例如网络服务、微服务、代理、数据库连接池、实时通信系统等。而不适合并行计算或密集计算等场景。
因此,如果需要使用 rust 中的高性能异步并发,且对于 Tokio 内部工作原理不敏感,看到这里放心使用就好了。后文将从rust语言的异步来着手,分析Tokio的架构以及具体调度的生命周期,最后分析与Nginx的对比以未来的一些方向。
rust 语言的异步
如何构建云服务器
对 Future/async/await 非常熟悉可以跳过。
Golang/Nodejs等语言的异步内置于语言本身,做了很好的封装且开箱即用,虽然能够简化使用但不灵活无法更改。rust作为系统级语言,并不想把异步的具体实现单一化与局限化,因此在rust std中只实现了异步的基本功能与框架(例如 Future/async/await 等),而把异步调度进行了开放,由第三方库来具体实现。
Future
Future 是 Rust 异步编程的核心抽象,它是一个状态机,通过多次 poll 推进其状态,直到完成。它与 Waker、Runtime、I/O 驱动紧密配合,构成了整个非阻塞异步系统的基础。
Future 其实就是一个trait,定义如下:
pubtraitFuture{typeOutput;fnpoll(self: Pin<&mutSelf>, cx: &mutContext<_>) -> Poll; }Future 与 任务(task)的区别: Future 是 rust 原生支持的异步 trait,许多第三方异步库在此基础上将 Future 封装为 task 用来完成调度。
async
通常不会用上述的trait来创建 Future,而是结合使用 async,编译器会将被 async 修饰的函数或代码块转化为 Future。也就是说调用 async fn 的具体函数并不会立刻执行,而只是创建 Future 等待 poll 来推动状态机。
// async fnasyncfnfetch_data() ->Result<String, Error> {letresp = reqwest::get("https://example.com").await?;Ok(resp.text().await?) }// async 代码块letfuture =async{// 异步逻辑letdata = expensive_computation().await;format!("Result: {}", data) };await
上述 Future trait中,poll 是核心方法,用于推进状态机的进行。我们的代码不会直接调用 poll,而是通过 Rust 的关键字 .await 来执行这个 Future,await 会被 rust 在编译时生成代码来调用 poll,返回 Poll(见下),如果是 Pending 则被 runtime 挂起(比如重新放到任务队列中)。当有 event 产生时,挂起的 future 会被唤醒,Rust 会再次调用 future 的 poll,如果此时返回 Ready 就执行完成。
pub enumPoll<T>{ Ready(T), Pending, }多级 Future 嵌套时,只有遇到类似 .await 才会推动执行,是协同式调度而不是抢占式调度(Tokio 1.x版本引入抢占机制来缓解饥饿问题,但rust原生基础是协同式调度)。因此 rust 无需提前为 Future 分配独立的栈或堆上内存,是一种零成本抽象。
总结
如下图所示,rust std 中的异步只维护 Future 以及内部方法 poll,具体的任务队列和调度方法由第三方的 runtime 来实现。每次代码执行到 .await 时会进行一次poll,poll 若 ready 则直接退出表示执行完成,poll 若遇到阻塞,则挂起等待事件池来唤醒。当有事件(例如I/O等)唤醒之后,会把该挂起 Future 封装为 task,加入到任务队列中等待调度,runtime 会不断地从任务队列中拿出任务来执行。
Tokio 架构与构造过程
架构
承接上文,这一部分主要介绍 Tokio 实现的 runtime架构,如下图所示:
Tokio 会启动很多 Worker,每个 Worker 对应一个线程,并发完成异步任务。Worker 内部又主要包括 任务队列 和 Driver引用。Driver 是异步运行时中的驱动引擎,它通过操作系统提供的 I/O 多路复用机制(如 Linux 上的epoll、Windows 上的IOCP、macOS 上的kqueue)来监听 I/O 事件和定时器事件,并在事件就绪后唤醒对应的 Future 继续执行。Tokio 中的 Driver 又分为I/O和Time,I/O负责监听socket和文件读写,Time负责sleep等定时任务。Tokio 将 Future 封装为任务,其中主要包含了waker,waker挂载到 Driver 中,当 Driver 有新事件时会回调 waker 来停止阻塞,并加入到任务队列中。任务会加入到任务队列中,任务队列又分为每个 Worker 内部的,以及全局的队列。通常在 Worker 创建的任务即加入到内部队列,全局创造的加入到全局队列,全局队列的任务会被任何 Worker 线程执行到。构造(build)
Tokio 中的 Runtime 结构体如下:
pubstructRuntime{/// Task schedulerscheduler: Scheduler,/// Handle to runtime, also contains driver handleshandle: Handle,/// Blocking pool handle, used to signal shutdownblocking_pool: BlockingPool, }blocking线程 和 worker线程:worker线程是我们要重点关注的运行时轻量级线程,负责调度和任务执行;blocking线程是在这个过程中的所有的阻塞任务,其数量等于所有的worker线程数量+其他控制线程数量,原因是worker线程本身就是一个blocking任务,其他控制线程又包括信号与通道等。
其中 BlockingPool 是专门用来运行阻塞任务的线程池,上述解释已简单概括;Handle 维护了过程中各种handler,本文不重点关注这两项。Scheduler 是任务池和调度器的封装,也是 Runtime 最核心的部分。
想要使用 Runtime 必须要经过初始化:
tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(threads).thread_name(name).build().unwrap(),build() 即构造了 Runtime 结构,其中最重要的是 Driver 和 Worker 。
Driver 构造的过程
阿里云服务器2012 iis
Driver 封装了 I/O 和 Timer 的驱动,并加入了部分机制(例如内存slab),下面以I/O Driver为例,详细说明:
pub(crate)fnnew(nevents:usize) -> io::Result<(Driver, Handle)> {letpoll = mio::Poll::new()?;[cfg(not(target_os ="wasi"))]letwaker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;letregistry = poll.registry().try_clone()?;letdriver = Driver { signal_ready:false, events: mio::Events::with_capacity(nevents), poll, };let(registrations, synced) = RegistrationSet::new();lethandle = Handle { registry, registrations, synced: Mutex::new(synced),[cfg(not(target_os ="wasi"))]waker, metrics: IoDriverMetrics::default(), };Ok((driver, handle)) }初始化mio的poll,底层就是 epoll/kqueue 对象。初始化waker,其是向poll注册一个特殊的事件 TOKEN_WAKEUP初始化driver后同时创建hadle,可以被线程共享,用于传入worker中。worker构造的过程
pub(super)fncreate( size:usize, park: Parker, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, ) -> (Arc, Launch) {letmutcores =Vec::with_capacity(size);letmutremotes =Vec::with_capacity(size);letmutworker_metrics =Vec::with_capacity(size);// Create the local queuesfor_in0..size {let(steal, run_queue) = queue::local();letpark = park.clone();letunpark = park.unpark();letmetrics = WorkerMetrics::from_config(&config);letstats = Stats::new(&metrics); cores.push(Box::new(Core { tick:0, lifo_slot:None, lifo_enabled: !config.disable_lifo_slot, run_queue, is_searching:false, is_shutdown:false, is_traced:false, park:Some(park), global_queue_interval: stats.tuned_global_queue_interval(&config), stats, rand: FastRand::from_seed(config.seed_generator.next_seed()), })); remotes.push(Remote { steal, unpark }); worker_metrics.push(metrics); }let(idle, idle_synced) = Idle::new(size);let(inject, inject_synced) = inject::Shared::new();letremotes_len = remotes.len();lethandle = Arc::new(Handle { task_hooks: TaskHooks::from_config(&config), shared: Shared { remotes: remotes.into_boxed_slice(), inject, idle, owned: OwnedTasks::new(size), synced: Mutex::new(Synced { idle: idle_synced, inject: inject_synced, }), shutdown_cores: Mutex::new(vec![]), trace_status: TraceStatus::new(remotes_len), config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: worker_metrics.into_boxed_slice(), _counters: Counters, }, driver: driver_handle, blocking_spawner, seed_generator, });letmutlaunch = Launch(vec![]);for(index, core)incores.drain(..).enumerate() { launch.0.push(Arc::new(Worker { handle: handle.clone(), index, core: AtomicCell::new(Some(core)), })); } (handle, launch) }为 Tokio 多线程运行时创建一组 Worker 线程,每个 Worker 都绑定了一个本地任务队列(Local Queue)、I/O 和定时器驱动(Driver),并准备好参与异步任务调度。其中:
创建本地任务队列和任务窃取队列。本地任务队列是一个无锁双端队列,可以用于其他线程的窃取,有关任务窃取在后文详细介绍。也创建了Handle,该Handle可以用来提交新任务、同步执行 Future和提交阻塞任务等。(隐式)通过参数传入 Driver,在线程中即可以通过 Driver 和 Driver Handle 完成 Wake 挂载、I/O事件、推进定时器等。每个Worker 会被传入到 Launch,启动调度循环(无限循环从队列中拿出任务poll)。Tokio task 生命周期
构造出基本的 runtime 架构后,就等待有任务被加入到 runtime 中被调度与执行,这一部分详细说明任务从被构造到执行完成的流程。
如上图所示,Worker 在创建后执行调度循环,不断地从任务队列中取任务,执行poll,若结果为阻塞则注册waker并挂起,之后取新任务poll。当 Driver 有新事件时会调用 waker 来唤醒任务,重新加入到任务队列中待新一轮调用。
任务队列
全局队列(global queue) 有且仅有一个,全局队列是必要的,因为有些任务不属于某个worker,只能加到全局队列中。全局队列是FIFO的设计架构。只有全局队列就会面临所有 worker都从全局队列拿任务,需要加锁影响性能。因此每个worker一个维护本地队列(local run queue),在worker内产生的任务会直接加到自己的本地队列中。本地队列也是FIFO的,保证了公平性,但是不同任务之间的资源会被割裂,调度会浪费CPU缓存,因此又多增了LIFO slot。LIFO slot可以理解为本地队列中的特权任务,例如某些任务spawn出新的任务,且该新任务对旧任务有大量的资源依赖,此时可以将新任务加入到LIFO slot中;或者某些任务通过信号通知其他任务可以被执行,此时该任务可以被加入到LIFO slot中,避免排队带来的延迟。由于LIFO slot会带来不公平的问题,因此只有一个槽位,被占就只能到本地队列的末尾排队。任务窃取:当本地和全局队列中都没有要执行的任务时(此时该worker已经空闲),会尝试窃取其他worker的本地队列的任务。这一机制保障了线程的负载均衡,同时提高系统整体的运行效率。窃取时会从末端开始,因此不会干扰worker的正常进行,该队列也无需加锁。最终,任务队列的执行顺序是:本地LIFO slot --->> 本地任务队列 --->> 全局任务队列 --->> 窃取其他线程的任务。
任务饥饿问题
饥饿问题有以下两种场景:
1.某任务是密集计算型任务,不断占据cpu而不释放;
2.本地任务更新太快太频繁,全局任务无法被执行到;
第一种场景,Tokio 从 1.x开始,引入了抢占式调度来缓解饿死问题,简单来说就是会定期强制任务挂起来让出资源。但是需要说明的是,这种场景本质上和rust的异步并发冲突,更加推荐使用tokio::task::spawn_blocking,来将任务转化为并行计算任务。
第二种场景经常会遇到,本质上是由于三种不同的队列有优先级,可能会导致低优先级的队列被饿死。例如I/O频繁的TCP连接会不断地加入到本地队列,而无法处理全局队列任务。Tokio 的解决的方法是为每个任务加入循环次数,当其循环加入队列次数超过一定上限后会先搁置,优先处理低优先级(例如全局队列)的任务。
总之,Tokio 设计了相关的机制来平衡公平性和效率,同时还有一些其他算法或异步runtime优化了部分过程,取得了更好的效果(例如horaedb(Apache、Ant)、Monoio(bytedance))。虽然并非完美,但是各方都在努力完善公平性和性能,这或许就是rust设计开放runtime的初衷。
其他
与Nginx的调度对比
Nginx采用 多worker进程单线程 + 非阻塞I/O + 事件驱动 的模型。
多进程单线程:最开始时,Nginx会创建多个worker进程,每个进程创建后即可以视为是完全独立的存在,后续的任务执行和资源占用互不干预。所有的worker进程会统一绑定监听某个或某些端口,当某端口有数据时,操作系统会分配给某个worker。(分配方法又分为 SO_REUSEPORT 和 accept_mutex机制,这里的详细机制后续会出一篇文章来详细说明,简单来说就是 SO_REUSEPORT 减少了惊群效应,去除了锁,提高了CPU负载均衡,但是会增加时延不均衡性)。事件驱动:当某个worker 接收到数据后,会将其注册到epoll事件中,并在相应的阶段调用相应的回调函数,主进程(即线程)会轮询执行这些事件回调。主进程是串行执行,无抢占式调度和锁的开销。非阻塞I/O:系统采用非阻塞I/O与多路复用机制(即epoll)来完成高并发处理。两者的对比如下:
在性能方面:Nginx更有利于处理高并发 但是 简单或静态请求,独立进程+事件驱动可以轻松处理高并发,但是进程间隔离不利于复杂处理;Tokio更有利于处理复杂场景,且开发灵活,能够支持许多新协议开发。在开发方面:Nginx基于回调函数开发,同一过程可能需要拆成多个同步函数,面向状态编程较为混乱容易遗忘细节,门槛较高;而Tokio可以基于过程开发,开发逻辑简单,门槛较低。绑核问题
Tokio本身并不支持绑CPU核,要实现绑核有以下三种方法:
直接给进程绑核。不同于Nginx的多进程程序,Tokio本身是单进程多线程程序,因此可以直接在系统层面给进程绑核。taskset-c[CPU NUMBER]-pPIDDocker绑核。类似于上述方法,若Tokio运行于docker上可以直接docker绑核。dockerrun --cpuset-cpus[CPU NUMBER]core_affinity_rs 第三方rust库。该库支持线程级别设置cpu核,可以在创建tokio::runtime时进行绑核设置runtime::Builder::new_multi_thread() .on_thread_start(move || { core_affinity::set_for_current(core_id.clone()); })Tokio 未来的一些方向
IO Uringio_uring是 Linux(>5.1) 上新一代的高性能异步 I/O 框架,其主要针对 epoll 进行了性能和功能上的提升。其设计了一种用户态和内核的环形缓冲区,解决线程竞争,实现无锁设计。减少了系统调用的次数,实现零拷贝。支持批量操作。
当前Tokio官方还没有全面支持 io_uring,但是在社区中已经出现了不少支持:例如tokio-uring,其基本思想是在IO Driver中的mio 进行封装,注册在 Tokio(mio) 上一个 uring fd,而基于这个 fd 和一套自己的 Pending Op 管理系统又对外作为 Reactor 暴露了事件源的能力。
扩展协议栈首先需要明确,若想使用IO或sleep等操作,必须要使用Tokio::net::TcpStream 来进行 Tokio的封装,主要原因是需要将IO或Timer事件注册到Reactor上,因此协议的支持也是非常重要的部分。当前 Tokio支持常见的TCP、UDP等协议,同时也在积极探索新协议以及更好的适配性,例如:更好的 TLS 支持(如整合rustls);支持 HTTP/3、QUIC 等协议栈(底层支持);对IPV6多路复用的增强支持。
日志与调试支持Tokio 正在推动对测试工具的支持,例如:提供 mock I/O 接口;支持 determinism 测试;提供 tracing、instrumentation 集成,便于调试异步程序。相关的社区实践包括 tokio-tesing等。
内部调度算法更新前文也说过,内部的调度算法决定了不同场景下的效果,当前已经有许多 runtime库提出了新的算法来优化某些过程,Tokio 也在积极探索和演进。
不同平台、形式支持Tokio 也在积极探索在不同平台的支持,例如嵌入式平台。同时也在开发WASM支持,可以在WASM中运行Tokio。
声明:Tokio更新频繁,本文主要针对1.44.1版本分析。
参考资料:
Tokio源码:https://github.com/tokio-rs/tokioTokio官方文档:https://tokio.rs/tokio/tutorial字节monoio:https://rustmagazine.github.io/rust_magazine_2021/chapter_12/monoio.htmlapache-haoraedb:https://github.com/apache/horaedbhttps://tony612.github.io/tokio-internals/01_intro_async.htmlhttps://tidb.net/blog/18804515无代理ECS数据备份与高效环境搭建
基于快照提供数据保护和环境搭建,实现无代理且有效可靠的数据备份,同时可以快速克隆部署开发测试环境。
点击无代理 ECS 备份与高效环境搭建-阿里云技术解决方案查看详情。
高校云服务器

发表评论
最近发表
标签列表