nodejs中的并发编程

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

nodejs中的并发编程

royalrover   2020-03-20 我要评论
## 从sleep的实现说起 在nodejs中,如果要实现sleep的功能主要是通过“setTimeout + promise”实现,也可以通过“循环空转”来解决。前者是利用定时器实现任务的延迟执行,并通过promise链管理任务间的时序与依赖,本质上nodejs的执行线程并没有真正的sleep,事件循环以及v8仍在运行,是仅仅表现在业务逻辑上sleep;而后者的实现则无疑实在浪费CPU性能,有点类似自旋锁,不符合大多数场景。 若要实现引擎层面(运行时)的sleep,事情在[ECMAScript Latest Draft (ECMA-262)](https://tc39.es/ecma262/#sec-atomics.wait)出现之后开始有了转机。ECMA262规定了 **Atomics.wait**,它会将调用该方法的代理(引擎)陷入等待队列并让其sleep,直到被notify或者超时。**该规范在8.10.0以上版本的nodejs上被实现。** 事实上,Atomics.wait 的出现主要解决浏览器或nodejs的worker之间数据同步的问题。浏览器上的web-worker、正式被nodejs@12纳入的worker-threads模块,这些都是ECMAScript多线程模型的具体实现。既然出现多线程那么线程间的同步也就不可避免的被提到,在前端以及nodejs范围内可以使用Atomics.wait和notify来解决。 说的有些跑题,回到本节,如何实现运行时的sleep呢?很简单,利用Atomics.wait的等待超时机制: ``` let sharedBuf = new SharedArrayBuffer(4); let sharedArr = new Int32Array(sharedBuf); // 睡眠n秒 let sleep = function(n){ Atomics.wait(sharedArr, 0, 0, n * 1000); } ``` 此处的sleep并不是异步方法,它会阻塞执行线程直到超时,因此需要根据业务场景来使用该sleep模型。 关于Atomics.wait的具体使用方法,下文会着重讲解。 ## 多线程同步 虽然nodejs多线程使用场景不是很多,但是一旦涉及到多线程,那么线程间同步就必不可少,否则无法解决临界区的问题。不过nodejs的work_threads对线程的创建不同于c或者java,它使用libuv的API创建线程 “uv_thread_create”,但是在此之前需要初始化一些设施如MessagePort、v8实例设置等,因此创建一个thread并不是一个轻量级的操作,需要结合场景酌情创建适量的threads。 回到正题,多线程间的同步一般需要依赖锁,而锁的实现需要依赖于全局变量。在nodejs的work_threads实现中,主线程无法设置全局变量,因此可以通过Atomics实现。正如上例中所示,Atomics.wait依赖 **SharedArrayBuffer**,这是共享内存的ArrayBuffer,threads之间可通过它共享数据,可真正操作ArrayBuffer时并不直接使用该对象,而是TypeArray。如Atomics.wait,第一个参数必须是**Int32Array**对象,而该对象指向的缓冲区为SharedArrayBuffer。当线程A因为Atomics.wait而阻塞后,可通过其它线程B调用**Atomics.notify**进行唤醒从而让线程A的v8继续执行。 ``` let { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); var sab = new SharedArrayBuffer(1024); var int32 = new Int32Array(sab); if (isMainThread) { const worker = new Worker(__filename, { workerData: sab }); worker.on('message', (d) => { console.log('parent receive message:', d); }); worker.on('error', (e) => { console.error('parent receive error', e); }); worker.on('exit', (code) => { if (code !== 0) console.error(new Error(`工作线程使用退出码 ${code} 停止`)); }); Atomics.wait(int32, 0, 0); // A console.log(int32[0]); // C: 123 } else { let buf = workerData; let arrs = new Int32Array(buf); Atomics.store(arrs, 0, 123); Atomics.notify(arrs, 0); // B } ``` 上例中,主线程创建thread后,在A处进行阻塞;在新线程中,通过原子操作**Atomics.store**修改SharedArrayBuffer的第一项为123后,于B处唤醒阻塞在SharedArrayBuffer第一项的其它线程;此时主线程被唤醒,执行`console.log(int32[0])`,输出被新线程修改后的SharedArrayBuffer第一项数据123。 ### 锁 分析一个公平、排它、不可重入锁的实现,它使用**Atomics.wait/notify/compareExchange**完成线程的同步。 ``` main-thread.js let Lock = require('./lock').Lock; let { Worker } = require('worker_threads'); const sharedBuffer = new SharedArrayBuffer(1 * Int32Array.BYTES_PER_ELEMENT); const sharedArray = new Int32Array(sharedBuffer); let worker = new Worker('./worker-lock.js', { workerData: sharedBuffer }); Lock.initialize(sharedArray, 0); const lock = new Lock(sharedArray, 0); // 获取锁 lock.lock(); // 3s后释放锁 setTimeout(() => { lock.unlock(); // (B) }, 3000) ``` ``` worker-thread.js let Lock = require('./lock').Lock; let { parentPort, workerData } = require('worker_threads'); const sharedArray = new Int32Array(workerData); const lock = new Lock(sharedArray, 0); console.log('Waiting for lock...'); // (A) // 获取锁 lock.lock(); // (B) blocks! console.log('Unlocked'); // (C) ``` 主线程初始化互斥锁,同时创建线程,主线程获取锁后三秒钟释放; worker线程尝试获取锁,此时锁已被主线程获取,因此worker线程在此阻塞,等待3s后主线程释放锁被唤醒,继续执行输出。 ``` lock.js const UNLOCKED = 0; const LOCKED_NO_WAITERS = 1; const LOCKED_POSSIBLE_WAITERS = 2; const NUMINTS = 1; class Lock { // 'iab' must be a Int32Array mapping shared memory. // 'ibase' must be a valid index in iab, the first of NUMINTS reserved for the lock. constructor(iab, ibase) { if (!(iab instanceof Int32Array && ibase|0 === ibase && ibase >= 0 && ibase+NUMINTS <= iab.length)) { throw new Error(`Bad arguments to Lock constructor: ${iab} ${ibase}`); } this.iab = iab; this.ibase = ibase; } static initialize(iab, ibase) { if (!(iab instanceof Int32Array && ibase|0 === ibase && ibase >= 0 && ibase+NUMINTS <= iab.length)) { throw new Error(`Bad arguments to Lock constructor: ${iab} ${ibase}`); } Atomics.store(iab, ibase, UNLOCKED); return ibase; } // Acquire the lock, or block until we can. Locking is not recursive: lock() { const iab = this.iab; const stateIdx = this.ibase; var c; if ((c = Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS)) !== UNLOCKED) { // A do { if (c === LOCKED_POSSIBLE_WAITERS || Atomics.compareExchange(iab, stateIdx, LOCKED_NO_WAITERS, LOCKED_POSSIBLE_WAITERS) !== UNLOCKED) { Atomics.wait(iab, stateIdx, LOCKED_POSSIBLE_WAITERS, Number.POSITIVE_INFINITY); } } while ((c = Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_POSSIBLE_WAITERS)) !== UNLOCKED); // B } } tryLock() { const iab = this.iab; const stateIdx = this.ibase; return Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS) === UNLOCKED; } unlock() { const iab = this.iab; const stateIdx = this.ibase; var v0 = Atomics.sub(iab, stateIdx, 1); // Wake up a waiter if there are any if (v0 !== LOCKED_NO_WAITERS) { Atomics.store(iab, stateIdx, UNLOCKED); Atomics.notify(iab, stateIdx, 1); } } toString() { return "Lock:{ibase:" + this.ibase +"}"; } } exports.Lock = Lock; ``` 当进程A尝试获取锁成功时,A处判断语句为false,因此由compareExchange设置状态为LOCKED_NO_WAITERS,直接执行其后续逻辑; 若进程B此时执行lock获取锁时,A处判断为true,进入do while循环体,在wait处sleep; 进程A通过unlock释放锁,会将锁状态置为UNLOCKED,同时唤醒阻塞的进程B; 进程B执行循环判断语句B,此时为false,跳出循环执行B的逻辑。 当然,也可通过tryLock实现自旋锁或者其他逻辑实现非阻塞等待。 ## 参考 [libuv漫谈之线程](https://zhuanlan.zhihu.com/p/25973650) [Atomics](https://tc39.es/ecma262/#sec-atomics.wait) [Atomics MDN](https:/https://img.qb5200.com/download-x/developer.mozilla.org/zh-TWhttps://img.qb5200.com/download-x/docs/Web/JavaScript/Reference/Global_Objects/Atomics)

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们