제목 : Node.js 엔진에서 스레드 추가하기 (Worker)
세상에는 정말 빠른 실행 시간과 계산을 보장하는 언어들이 많다.
벌써 떠오르기를,
- C, C++
- Java
- Python
- Rust
- Swift
- Go
- Kotlin
- 등등..
Node.js 기반의 엔진보다 CPU 실행 성능이 뛰어나며, Memory 절약도 뛰어난 언어일 것이다. (Python 은 약간 더 뛰어날듯?)
JavaScript 는 위의 언어들 중 몇 개들 보다 더 일찍 만들어 졌지만,
웹 페이지에서의 Dynamic 한 인터랙션을 위해 만들어졌다.
기존의 HTML 문서는 DOM (Document Object Model) 로 파싱되어,
JavaScript 를 통해 엘리먼트들의 위치나 속성을 동적으로 변경할 수 있게 되었다.
웹을 위한 언어에서 머무르던 JavaScript 는, Node.js 엔진의 최적화와 TypeScript 의 대중화에 힘입어
많은 사람들의 입문 언어가 되기 시작했고, 그 아성은 끝내 많은 마이크로서비스 아키텍쳐의 서버에 사용되기도 한다.
자바스크립트는 인터프리터 언어로, 기계어나 어셈블리어 수준으로 미리 컴파일되지는 않는다.
JIT(Just In Time) 컴파일 방식을 사용하는데, 이는 런타임 과정에서 코드를 읽는다는 것이다.
당연하겠지만, 이러한 방식은 미리 바이너리로 컴파일 되는 방식보다는 런타임 환경에서 최적화가 낮을 수 밖에 없다.
하지만, 웹 제작 과정에서 자바스크립트는 피할 수 없는 운명이고,
자바스크립트와 관련된 커뮤니티의 방대함과 NPM 패키지 매니저의 편리함은 유저 풀을 더 형성하기에 이르렀다.
그러나, JavaScript 엔진이 타 언어의 런타임보다 느리다는 것은 여전히 치명적으로 작용한다고 생각한다.
코드 리터럴의 편리성과 메타 프로그래밍 언어로서의 동적 속성은 위의 단점을 상쇄하기도 한다.
팀 프로젝트에서 NestJS 를 이용한 메타 프로그래밍을 접한 이후,
JS, TS 와 더불어 메타 프로그래밍을 이해하고자 현재 NestJS 강의를 udemy 사이트에서 듣고 있다.
그런데, 공부를 할 수록, NestJS 와 Spring 간의 간극은 더욱 커 질 수 밖에 없을 것 같다는 생각이 들었다.
그도 그럴 것이, NestJS 는 당연히 Node.js 엔진을 사용하기에, Single Thread 와 Event Driven I/O 라는 특성이
타 언어에 비해 사용자 요청을 더 효율적으로 처리할 수 있다는 장점과, 자원 경쟁 문제가 없다는 장점이 있었다.
그렇지만, Single Thread 의 문제점은, 이미지나 비디오 처리, 혹은 CPU 집약 처리에 있어서 쥐약이라는 말과
동일하다고 생각했다.
반면에 Spring 은, 유저의 요청을 처리하기 위해 미리 쓰레드 풀을 만들어 놓고,
들어오는 요청에 쓰레드를 할당시키고 있었다.
이렇게 되면, 결국
싱글 스레드, 이벤트 기반 I/O VS 멀티 스레드, 병렬 처리 가 될 것이라고 생각했다.
당연하게도, I/O 처리에서도 병렬 처리를 수행하는 Spring 이 우위일 것이다.
나는 이러한 생각의 과정에서, NestJS 혹은 express 와 같은 라이브러리가,
어떻게 Spring 의 속도를 따라갈 수 있을까? 를 고민했다.
당연하게도, Node.js 엔진 기반의 JavaScript 또한 Thread 를 생성 할 수 있었다.
그러나, 기존의 메모리 pool 을 공유한 상태로 시작하는 것이 아니라,
새로운 메모리 pool 을 생성하여 실행한다는 것이었다.
생각 해 보니, Node.js 는 하나의 스레드에 메모리를 할당하고, 처리 과정은 Event I/O 로 하니,
만약에 새로운 스레드를 생성하고 같은 메모리를 차지하면, Event Loop 과정이 박살날 것 같다는 생각이 든다.
따라서, 다른 메모리 공간을 할당하여 새로운 스레드에 이벤트 루프 할 공간을 마련해 주는 것이다.
만약에 CPU 집약적인 처리를 새로운 스레드에 할당하면,
flowchart LR
Client("클라이언트")
subgraph framework ["Node.js 기반의 프레임워크"]
Node("Main Thread")
Thread("CPU 집약 처리를 위해 생성된 Thread")
Node -- 처리 데이터 전달 : 기다림 --> Thread
Thread -- 데이터 처리 완료 : 완료 --> Node
end
Client <-- 요청/응답 --> Node
메인 스레드는 그대로 클라이언트에게서 밀려오는 요청을 처리하고,
특정 CPU 집약 처리를 새로운 Thread 에게 일임함으로서,
메인 스레드가 집약 처리를 위해 잠깐 I/O 를 멈추는 일은 없게 만드는 것이다.
이 과정에서, child_process
혹은 cluster
라이브러리와는 달리,
worker_threads
라이브러리는 참조할 배열 버퍼를 보내고, 이를 메인 Thread 와 공유 할 수 있다.
이번 포스팅에서는 worker_threads
를 사용 할 것이다.
우선, NestJS 프로젝트에 적용 할 생각을 가지고 있기에,
나는 TypeScript 를 사용하여 worker_threads
를 구현할 것이다.
이 과정에서, 타입스크립트를 업데이트하고,
tsconfig.json
에 devDependencies
에 @types/node
를 추가하면 된다.
$ npm i -D @types/node # 내장 빌트인 함수 참조 가능하게 만들어줌. (TypeScript)
그리고 tsc
명령어로 일일이 컴파일하고 node
로 실행하기 귀찮다면,
HomeBrew 에 ts-node
를 설치하거나,
$ npm i -g ts-node # 전역 명령어로 ts-node 실행 가능
하면 된다. (개인적으로는 패키지 관리 프로그램에 설치..)
워커를 생성하고, 할당하는 Main Thread :
import { Worker } from 'worker_threads';
import * as path from 'path';
function startWorker(): Promise<number> {
return new Promise((resolve, reject) => {
const worker = new Worker(path.resolve(__dirname, './script.js'));
worker.on('message', (result) => {
console.log('Worker 결과:', result);
resolve(result);
});
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`워커가 비정상 종료 (exit code ${code})`));
}
});
});
}
async function work() {
const result = await startWorker();
console.log(result);
}
work();
script.ts
: Worker
가 실행 할 스크립트가 존재하는 위치 - sub 스레드 시작.
import { parentPort } from 'worker_threads';
let sum = 0;
for (let i = 1; i <= 100_000; i++) {
sum += i;
}
parentPort?.postMessage(sum);
먼저, TypeScript 환경에서 Worker Thread 에 URL 을 넘길 때 문제가 있다.
script.ts
파일이 물론 TypeScript
로 작성되었지만,
결국 Node.js 환경으로 구동되기 때문에, .js
확장자로 실행한다는 것이다.
따라서, TypeScript
로 작성된 파일이더라도, .js
로 입력해 주어야 한다.
만약에 TypeScript
파일을 그대로 넣는다면,
➜ worker-exam ts-node worker-1.ts
(node:3217) ExperimentalWarning: Type Stripping is an experimental feature and might change at any time
(Use `node --trace-warnings ...` to show where the warning was created)
Worker 결과: 5000050000
5000050000
(node:3217) [MODULE_TYPELESS_PACKAGE_JSON] Warning: Module type of file:///Users/xxxx/xxxx/script.ts is not specified and it doesn't parse as CommonJS.
Reparsing as ES module because module syntax was detected. This incurs a performance overhead.
To eliminate this warning, add "type": "module" to /Users/xxxx/xxxx/package.json.
.js
확장자로 바꾸어 준다면,
➜ worker-exam ts-node worker-1.ts
Worker 결과: 5000050000
5000050000
.js
파일에서 다시 .ts
파일을 불러와 컴파일 하는 과정을 없애주므로,
경고 로깅도 뜨지 않는 것을 볼 수 있다.
따라서, 위에 URL 참조로 script.js
를 넣은 것은, 내가 컴파일 된 환경에서
node xxx
로 실행할 것을 염두에 두었기 때문이다.
Node.js 공식 홈페이지의 Worker 소개
https://nodejs.org/docs/latest/api/worker_threads.html
공식 홈페이지에서는, 파일의 가독성을 위해 "하나의 파일 안" 에
- Main Thread 의 작업
- Worker Thread 의 작업
의 내용이 모두 들어가 있다.
이는 굳이 파일의 내용을 나누지 않고, 런타임 환경에서
메인 스레드에서 실행하는가? 혹은 워커 스레드에서 실행하는가? 로 나눌 수 있는 것이다.
Example
import {Worker, isMainThread, parentPort, workerData} from 'worker_threads';
if (isMainThread) {
new Worker(__filename);
} else {
console.log("내부에서 워커가 실행됨!");
console.log(isMainThread);
}
Result :
➜ worker-exam tsc
➜ worker-exam node node-exam-1.js
내부에서 워커가 실행됨!
false
- 통상적으로
.js
파일로 실행하여 경고 로그를 없애기 위해, 컴파일 후node
로.js
파일로 실행. isMainThread
는node
라이브러리에서 제공하는 환경 변수로,
현재 실행중인 스레드 환경에 따라true
orfalse
로 반환된다.
이처럼, isMainThread
이라는 node
의 환경 변수를 통해,
하나의 파일에서 메인 스레드가 실행 할 코드와, 서브 스레드가 실행 할 코드를 분리 할 수 있다.
Worker
라이브러리는 메인 스레드와는 다른 메모리 공간을 가지는 서브 스레드를 만들어 준다.
그렇지만, 메인 스레드에서 각각의 스레드에게 제공할 데이터를 만들어 줄 수도 있고,
혹은 "모든 서브 스레드" 에서 참고할 환경 변수 (정적 데이터?) 를 설정 해 줄 수도 있다.
하나의 서브 스레드에 데이터 전달하기
import { isMainThread, Worker, workerData } from 'worker_threads';
if(isMainThread) {
const worker = new Worker(__filename, {
workerData : {
sendData : "메인 스레드가 서브 스레드로 이 문제열을 제공합니다."
}
})
} else {
console.log("서브 스레드 시작.");
const sendData = workerData.sendData;
console.log(sendData);
}
Result :
➜ worker-exam tsc
➜ worker-exam node node-exam-2.js
서브 스레드 시작.
메인 스레드가 서브 스레드로 이 문제열을 제공합니다.
new Worker(...)
실행 시,
이 생성자 식에 1 번쨰로는 "실행 할 파일의 경로" 를 의미하고,
2 번째는 선택적인데, "서브 스레드에서 참조할 데이터" 를 의미한다.
그리고 나서 Sub Thread 에서 실행 할 때,
자신에게 주어진 참조 데이터를 알기 위해 workerData
를 라이브러리에서 가져오고,
여기서 sendData
를 추출하는 것이다.
물론, workerData
는 any
형식이므로, 단순하게 primitive 타입으로 넣어도 된다.
나는 예시로 객체를 넣어봤다.
모든 서브 스레드에서 참조할 환경 변수 설정하기
import { isMainThread, Worker, getEnvironmentData, setEnvironmentData } from 'node:worker_threads';
if (isMainThread) {
setEnvironmentData(1, 2);
setEnvironmentData("MyKey", "MyValue");
const worker = new Worker(__filename);
} else {
console.log("서브 스레드 시작 ----");
console.log(`공통 환경 변수 1 은, = ${getEnvironmentData(1)}`)
console.log(`공통 환경 변수 "MyKey" 는, = ${getEnvironmentData("MyKey")}`);
}
Result :
➜ worker-exam tsc
➜ worker-exam node node-exam-3.js
서브 스레드 시작 ----
공통 환경 변수 1 은, = 2
공통 환경 변수 "MyKey" 는, = MyValue
메인 스레드에서 서브 스레드가 공통으로 참조할 수 있는 환경 변수 2 개를 만들었다.
1
:2
"MyKey"
:"MyValue"
서브 스레드는 메인 스레드가 등록 해 놓은 환경 변수를 참조하여,
2
와 "MyValue"
를 출력하고 있다.
위에서 제시한 데이터 전달 방식은,
- 스레드가 생성 될 때 마다, 해당 스레드가 참조할 데이터 지정
- 스레드가 생성 될 때 마다, 자동으로 참조 할 수 있는 환경 변수 지정
이렇게 나뉘게 된다.
메인 스레드가 서브 스레드에서의 결과를 기다린다.
사실, 워커 스레드를 배우는 가장 큰 이유가 이것이라고 생각한다.
이 글을 작성하는 이유는, NestJS, Express 와 같은 프레임워크 및 라이브러리는,
따로 스레드를 생성하지 않고, 싱글 스레드로 작동한다.
이로 인해 Input, Output 과정에서는 기다림이 없지만,
해당 서버를 사용하는 클라이언트 입장에서는 하나의 서버가 자신들의 모든 요청을 처리하는 것이다.
이러한 특성으로 인해, 내부에 CPU 집약적인 과정에 포함 될 경우, 데이터 반환이 느려질 수 있다.
모든 클라이언트 요청을 동시에 수행하지만, 어떠한 클라이언트 요청이 CPU 집약적이라면,
일부 클라이언트의 응답이 느려질 수 밖에 없는 것이다. - 어쨋뜬 CPU 집약 작업을 해야 하기에.
따라서, 메인 스레드는 클라이언트의 요청과 응답에 집중하되,
서브 스레드 Worker
를 생성하여, CPU 집약 과정을 따로 수행하도록 만드는 것이다.
이러한 과정을 사용하면, Node.js 특유의 싱글 스레드를 타파할 수 있다.
그렇다면, Worker
는 CPU 집약 과정을 수행하고, Main Thread
는 그 과정을 기다린다.
그 후, Main Thread
가 클라이언트에게 수행된 결과를 보내준다.
하지만, Node.js
환경에서 "어떻게 기다릴 것" 인가?
이 방식은 Promise
로 해결할 수 있다.
왜 Promise 인가?
내가 위에서 제공한 방식은, 메인 스레드가 서브 스레드를 생성하고, 기다리지 않는다.
그러니까, 단순하게 워커를 생성하고 실행하게 방치 할 뿐, 기다리지는 않는다.
sequenceDiagram
Main-Thread->>Worker-Thread: 새로운 워커 할당
Main-Thread->>Worker-Thread: 또 다시 워커 할당
Worker-Thread-->>Main-Thread: 반환하지를 않음.
만약에 Promise 로 감싸지 않는다면?
sequenceDiagram
Client->>Main-Thread: 요청
Main-Thread->>Worker-Thread: CPU 계산 요청
Main-Thread->>Client: 응답
Worker-Thread->>Main-Thread: CPU 계산 완료 및 응답(이미 늦음)
Node.js 서버의 특유 이벤트 루프로 인해, 기다리지 않고 그대로 반환한다.
이를 위해, Promise
객체를 이용하여, Worker
가 특정 이벤트를 받아
resolve
할 때 까지 기다리게 만드는 것이다.
그렇게 한다면,
sequenceDiagram
Client->>Main-Thread: 계산 요청
Main-Thread->>Worker-Thread: CPU 계산 요청
Worker-Thread->>Main-Thread: CPU 계산 완료 및 응답
Main-Thread->>Client: 계산 결과 응답
순차적으로 진행 할 수 있다.
Promise
객체는 "약속" 을 의미한다.
어떠한 약속이냐면, Promise
객체는 무조건 resolve
, reject
둘 중 하나의 상태를 지닌다는 약속이다.
그 때 까지, Promise
객체를 부른 함수는 resolve
, reject
가 될 때 까지 기다린다.
물론, 함수는 async 를 사용해야 한다.
이를 사용하여 코드를 작성 해 보자.
import { Worker, isMainThread, workerData } from 'node:worker_threads';
import { parentPort } from 'worker_threads';
if (isMainThread) {
function getSummary(num : number) {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename);
// 워커가 "error" 이벤트를 보낸다면, reject 상태값과 함께(err) 객체를 반환한다.
worker.on('error', (err) => {
reject(err);
});
// 워커로부터 "message" 이벤트를 받는다면, 해당 메세지를 출력하고, 종료한다.
worker.on('message',async (num : number) => {
console.log(`메인 스레드에서 결과 출력 : ${num}`);
resolve(num);
// 완료했으므로, 이제 종료한다.
await worker.terminate();
});
// 워커에 직접 ('message') 이벤트를 보내며, 문자열 데이터를 보낸다.
worker.postMessage(10);
})
}
getSummary(10).then((res) => console.log(res));
} else {
function summary(number : number) {
let sum = 0;
for(let i = 0; i < number; i++) {
sum += i;
}
return sum;
}
parentPort.on('message', (num : number) => {
const sum = summary(num);
parentPort.postMessage(sum);
})
}
Result :
➜ worker-exam tsc
➜ worker-exam node node-exam-4.js
메인 스레드에서 결과 출력 : 45
45
위의 코드는 메인 스레드와 워커 스레드가 수행해야 할 내용이 모두 포함되어 있으므로,
2 개로 나누어서 파악해야 한다.
먼저, worker.on
, parentPort.on
의 의미와 차이점을 알아야 한다.
우선, Worker
는 이벤트를 등록할 수 있다.
이게 어떤 의미냐면,
Main --> Sub 로 이벤트를 보냈을 때, Sub 에서는 어떻게 반응 할 것인가?
Sub --> Main 로 이벤트를 보냈을 때, Main 에서는 어떻게 반응 할 것인가?
이러한 의미이다.
-worker.on
- 의미
생성된 worker 스레드는 자신의 parent 스레드인 main 스레드에 이벤트를 보낼 수 있다.
이 때, 부모 스레드인 Main 스레드가 반응 할 이벤트를 지정하는 것이 worker.on
이다.
-parentPort.on
- 의미
이 문구는 worker 스레드 코드가 이용하며, 메인 스레드가 어떤 이벤트를 주었을 때,
어떤 행동이나 응답을 할 지 서술하는 장소이다.
가장 많이 예제로 사용되는 것이 바로
worker.on('message', () => {...})
- 워커 스레드가message
이벤트와 데이터를 보낸다면.parentPort.on'message', () => {...})
- 부모인 Main 스레드가message
이벤트와 데이터를 보낸다면.
이 두 개이다.
그리고 각자,
- 메인 스레드 --> 워커 스레드 :
worker.postMessage(...)
- 워커 스레드 --> 메인 스레드 :
parentPort.postMessage(...)
로, 서로의 이벤트를 trigger 할 수 있다.
그렇다면, 위의 지식을 기반으로, 메인과 워커 스레드의 코드를 분석 해 보자.
Main Thread
function getSummary(num : number) {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename);
// 워커가 "error" 이벤트를 보낸다면, reject 상태값과 함께(err) 객체를 반환한다.
worker.on('error', (err) => {
reject(err);
});
// 워커로부터 "message" 이벤트를 받는다면, 해당 메세지를 출력하고, 종료한다.
worker.on('message',async (num : number) => {
console.log(`메인 스레드에서 결과 출력 : ${num}`);
resolve(num);
// 완료했으므로, 이제 종료한다.
await worker.terminate();
});
// 워커에 직접 ('message') 이벤트를 보내며, 문자열 데이터를 보낸다.
worker.postMessage(10);
})
}
// 워커 스레드 시작 및 기다림 후 응답 결과를 "res" 로 추출
getSummary(10).then((res) => console.log(res));
클라이언트가 요구하는 계산 사항을 워커 스레드를 이용하여 계산하되,
워커 스레드가 계산을 완료 했을 때, 다음 과정을 이어갈 수 있도록, Promise
객체로 감싼다.
여기서 다음 과정을 이어갈 수 있게 만드는 것은 resolve
, reject
로,
에러가 일어났을 때는 error
, 워커가 부모에게 message
이벤트를 트리거한다면,
결과인 num
을 바탕으로 결과를 출력하며, 생성된 워커를 종료한다.
Worker Thread
function summary(number : number) {
let sum = 0;
for(let i = 0; i < number; i++) {
sum += i;
}
return sum;
}
parentPort.on('message', (num : number) => {
const sum = summary(num);
parentPort.postMessage(sum);
})
worker 스레드는 main 스레드로부터 num
변수를 받는데,
이 num
변수는 main 스레드가 message
이벤트 트리거와 함께 제공한다.
이를 바탕으로, 워커 스레드는 num
까지 더하여 합한 수를
"다시" 메인 스레드에게 전달하는데, message
트리거와 함께 수행한다.
메인 스레드와 워커 스레드의 이벤트 트리거를 등록하고,
메인 스레드에서 워커 스레드의 이벤트를 발동(트리거) 시킨다면,
워커 스레드는 결과와 함께 메인 스레드의 이벤트를 발동(트리거) 하며, 워커 스레드는 종료한다.
worker.terminate() 는 뭘까?
이건 생각보다 중요하게 생각 해야 할 부분인데,
워커 스레드에 이벤트가 등록되면, 워커 스레드는 자신만의 이벤트 루프에
이벤트를 등록한다.
물론, resolve(...)
만으로 Promise
문이 충족되어,
프로그램이 넘어가 지며, 로직 상으로는 문제가 없지만,
이것은 컴퓨터 자원을 지속적으로 갉아먹고 있는 것과 동일하기 때문에,
worker.terminate()
를 통해, 기기에 리소스를 반환하는 것은 매우 중요하다.
그 이유는, 앞으로 제작하게 될, Thread Pool 에 매우 중요한 요소이기 때문이다.
흐름의 결과는 어떨까?
sequenceDiagram
Client->>Main Thread: 요청
Main Thread->>Worker Thread: 워커 스레드 생성
Worker Thread->>Worker Thread: 이벤트 등록
Main Thread->>Main Thread: 이벤트 등록
Main Thread->>Worker Thread: 이벤트 trigger ('message')
Worker Thread->>Worker Thread: 계산 수행
Worker Thread->> Main Thread: 이벤트 trigger ('message')
Main Thread->> Client: 결과 데이터 반환
Main Thread->> Worker Thread: 워커 스레드 종료 지시
위에서 진행했던 마지막 예제의 흐름은 이렇다.
하지만, 조금 아쉬운 것은 스레드 간 바로 메모리 공간을 공유하지 않는다는 것이다.
메모리 공간을 공유한다면, 메모리를 새로이 생성하는 불필요한 과정을 생략할 수 있을 것이다.
하지만, 여기서 특유의 우회법은 여전히 존재했다.
SharedArrayBuffer
우리가 workerData
혹은 setEnvironmentData([key], [value])
형식으로
워커 스레드가 참조할 데이터를 전달 해 주거나, 설정 해 주었다.
데이터를 굳이 공유하지 않고, 스레드가 스레드에게 데이터를 전달 해 주는 이유는,
각각의 스레드가 각자의 메모리 공간을 가지고 있기 때문이다. (싱글 스레드로 동시에 여러 일을 수행.)
그러나, Node.js 에서도 스레드 간 메모리를 공유할 수 있는 방식은 존재한다.
바로, SharedArrayBuffer
이다.
SharedArrayBuffer
는, Node.js 환경에서 서로 다른 스레드 환경에서 같은 메모리를 참조하게 해 준다.
SharedArrayBuffer 란?
각자 다른 메모리 공간을 가지고 있는 스레드들이,
같은 메모리 공간을 참조할 수 있게 만들어 주는 배열이다.
메인 스레드가 워커 스레드에게 값을 전달 할 때 해당 스레드의 메모리 공간을 생성하거나,
워커 스레드에게 값을 전달 할 때 메인 스레드 공간의 메모리 공간을 생성 할 필요가 없다.
하지만, 편의성을 주장하는 Node.js 와 달리, 사용하기 위해 몇 가지 절차가 필요하다.
길이는 바이트 단위로 선언해야 한다
SharedArrayBuffer
는 고정된 길이의 원시 바이너리 데이터 버퍼를 표현해야 한다.
따라서, 스레드 간 공유될 이 데이터 버퍼가, 어떤 데이터 타입의 배열을 가지게 되느냐에 따라서
가질 수 있는 길이가 제한된다.
예를 들어, 우리가 일반적으로 사용하는 (Java, C 라고 가정) 정수의 타입은, Int
이며, 이는 4 BYTE 이다.
따라서, SharedArrayBuffer
는 4 바이트 정수 타입으로 사용 할 것이기 때문에,
길이는 4 의 배수가 되어야 한다.
// Node.js 환경에서 편하게 생성하던 배열의 형식
const arr = [1, 2, 3, 4];
// SharedArrayBuffer 에서 사용하는 배열의 형식 - 16 Byte 데이터 버퍼 생성 - 모든 스레드 공유 가능.
const sharedArr = new SharedArrayBuffer(16);
// 이를 바탕으로, 모든 스레드에서 공유 가능한 4 byte Integer 형식의 배열 생성. - Length : 4
const intArr = new Int32Array(sharedArr);
intArr[0] = 5;
intArr[1] = 6;
intArr[2] = 7;
intArr[3] = 8;
//Result
for(let i in arr) {
console.log(`arr 내용물 : ${i}`);
}
// 위와 같은 for..in 방식은 안된다.
for(let i = 0; i < intArr.length; i++) {
console.log(`intArr 내용물 : ${intArr[i]}`);
}
Result
➜ worker-exam node main.js
arr 내용물 : 0
arr 내용물 : 1
arr 내용물 : 2
arr 내용물 : 3
intArr 내용물 : 5
intArr 내용물 : 6
intArr 내용물 : 7
intArr 내용물 : 8
한 번 생성된 배열은 .append()
와 같이 사용해서는 안되고, (byte 단위 데이터 버퍼이기 때문)
전용 메서드인 grow()
로 길이를 늘려야 한다.
물론, 다시 Int32Array
로 감싸야 할 것이다.
SharedArrayBuffer 를 통해 스레드 간 메모리를 공유해보자
이제, Worker
예제와 함께, Main 스레드와 Worker 스레드 간 메모리를 공유 해 보자.
그런데, 나는 커스텀 이벤트 문자열을 만들어서 적용하려고 시도했다.
그럼에도 불구하고 parentPort.on('plus')
와 같은 커스텀 이벤트는 되지 않았다.
커스텀 이벤트를 제작하기 위해서는, 미리 정해진 'message'
로 전달하는 것이
옳다고 한다.
따라서, 모든 이벤트를 'message'
내부에 들어가도록 변경했다.
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
interface MainReceiveMessage {
type : string;
value : undefined | string;
}
interface WorkerReceiveMessage {
type : string;
value : undefined | number;
}
if (isMainThread) {
function TestSharingMemory(arr : number[]) {
return new Promise<Int32Array>((resolve, reject) => {
// 4Byte Integer Array 가정.
const sharedArrayBuffer = new SharedArrayBuffer(4 * arr.length);
// int 배열로 사용할 수 있도록 제작.
const int32Array = new Int32Array(sharedArrayBuffer);
// 들어온 변수를 공유 배열 메모리에 할당한다.
for(let i = 0; i < arr.length; i++) {
int32Array[i] = arr[i];
}
// 배열 계산 과정을 수행할 워커 생성(초기화)
const worker = new Worker(__filename, {
workerData : int32Array
});
// 워커에게 이 에러를 받으면, 이러한 행동을 한다고 선언.
worker.on('error', async (err) => {
reject(err);
await worker.terminate();
})
worker.on('message', async (message : MainReceiveMessage) => {
if(message.type === 'done'){
console.log("지정된 워커의 계산이 끝남.")
console.log(message.value);
} else if(message.type === 'close') {
resolve(int32Array);
await worker.terminate();
} else {
console.log(`워커 스레드가 메세지를 전해옴 : ${message}`);
}
})
worker.postMessage({
type : 'plus',
value : 10
});
worker.postMessage({
type : 'minus',
value : 20
});
worker.postMessage({
type : 'close'
});
})
}
TestSharingMemory([1, 2, 3, 4, 5]).then((arr) => {
console.log("---------------");
console.log(`최종 결과 : `);
console.log(arr.toString());
})
} else {
function calculatePlusArr(arr : Int32Array, num : number) {
// 공유된 배열 메모리의 각 요소를 주어진 만큼 증가시킨다.
for(let i = 0; i < arr.length; i++) {
arr[i] = arr[i] + num;
}
}
function calculateMinusArr(arr : Int32Array, num : number) {
// 공유된 배열 메모리의 각 요소를 주어진 만큼 감소시킨다.
for(let i = 0; i < arr.length; i++) {
arr[i] = arr[i] - num;
}
}
// 워커 데이터로 공유 메모리를 참조한다.
const int32arr : Int32Array = workerData;
parentPort.on('message', (msg : WorkerReceiveMessage) => {
switch (msg.type) {
case "plus":
parentPort.postMessage("플러스 계산 수행 시작 : " + msg.value);
calculatePlusArr(int32arr, msg.value);
parentPort.postMessage({
type : "done",
value : int32arr.toString()
})
break;
case "minus":
parentPort.postMessage("마이너스 계산 수행 : " + msg.value);
calculateMinusArr(int32arr, msg.value);
parentPort.postMessage({
type : "done",
value : int32arr.toString()
});
break;
case "close":
console.log("메인 스레드에서 종료 메세지가 왔으니, 상태를 확인하고 반환.");
// 확인할 상태가 있다면, 같이 전달.
parentPort.postMessage({
type : "close"
});
break;
}
})
}
Result :
➜ worker-exam node node-exam-5.js
워커 스레드가 메세지를 전해옴 : 플러스 계산 수행 시작 : 10 # parentPort.postMessage("플러스 계산 수행 시작 : " + msg.value);
지정된 워커의 계산이 끝남. # console.log("지정된 워커의 계산이 끝남.")
11,12,13,14,15 # console.log(message.value);
워커 스레드가 메세지를 전해옴 : 마이너스 계산 수행 : 20 # ...
지정된 워커의 계산이 끝남. # ...
-9,-8,-7,-6,-5 # ...
---------------
최종 결과 :
-9,-8,-7,-6,-5 # TestSharingMemory([1, 2, 3, 4, 5]).then((arr) => { ...
메인 스레드에서 종료 메세지가 왔으니, 상태를 확인하고 반환. # 워커를 이제서야 반환한다!
어느 누구라도, 다른 사람이 작성한 코드를 읽기란 매우 어려운 일이다.
자신이 작성한 코드라도 헷갈려 하는 세상에서,
남이 내가 작성한 코드를 한눈에 읽을 것이라는 생각은 버리겠다. (나의 클린 코드의 문제일 것이다!)
따라서, 메인 스레드에서의 코드, 워커 스레드에서의 코드를 분리해서 이해해 보자.
Main Thread
// 메세지 전달 객체를 기반으로 데이터를 전달 할 것이다.
interface MainReceiveMessage {
type : string;
value : undefined | string;
}
// 메인 스레드 코드 시작
function TestSharingMemory(arr : number[]) {
return new Promise<Int32Array>((resolve, reject) => {
// 4Byte Integer Array 가정.
const sharedArrayBuffer = new SharedArrayBuffer(4 * arr.length);
// int 배열로 사용할 수 있도록 제작.
const int32Array = new Int32Array(sharedArrayBuffer);
// 들어온 변수를 공유 배열 메모리에 할당한다.
for(let i = 0; i < arr.length; i++) {
int32Array[i] = arr[i];
}
// 배열 계산 과정을 수행할 워커 생성(초기화)
const worker = new Worker(__filename, {
workerData : int32Array
});
// 워커에게 이 에러를 받으면, 이러한 행동을 한다고 선언.
worker.on('error', async (err) => {
reject(err);
await worker.terminate();
})
worker.on('message', async (message : MainReceiveMessage) => {
if(message.type === 'done'){
console.log("지정된 워커의 계산이 끝남.")
console.log(message.value);
} else if(message.type === 'close') {
resolve(int32Array);
await worker.terminate();
} else {
console.log(`워커 스레드가 메세지를 전해옴 : ${message}`);
}
})
worker.postMessage({
type : 'plus',
value : 10
});
worker.postMessage({
type : 'minus',
value : 20
});
worker.postMessage({
type : 'close'
});
})
}
TestSharingMemory([1, 2, 3, 4, 5]).then((arr) => {
console.log("---------------");
console.log(`최종 결과 : `);
console.log(arr.toString());
})
이 코드를 작성 한 이유는, SharedArrayBuffer
를 이용하여,
서로 다른 쓰레드에서 메모리를 공유 할 수 있다는 것을 증명하기 위함이다.
즉, 서로 다른 메모리 공간을 가질 수 밖에 없는 스레드들에게, 공유 컨텍스트를 제공하는 것이다.
그리고, 메인 스레드와 워커 스레드가 서로에게 이벤트를 날리는데,
이 이벤트는 'message'
로 한정되어 있다는 말을 하는 것이다.
커스텀 이벤트는 등록이 불가하며,
이벤트를 받는 쪽은 on('message', (msg) => {...})
를 작성하며,
이벤트를 보내는 쪽은, postMessage({type : ..., value : ...})
를 작성한다.
여기서, 커스텀 이벤트는 등록할 수 없다.
그렇다면, 메인 스레드에서 워커 스레드에게 "특정 행동" 을 지시하려면 어떻게 해야 할까?
그것은 메세징 객체에 type
을 보내는 것이다.
양방향 통신을 위해 중간에 보낼 수 있는 이벤트는 message
밖에 없다.
그 외에는 특정 상황밖에 없다.
그렇다면, message
이벤트와 함께, 내부 객체를 스스로 정의하는 것이다.
이를 통해 위 코드에서는
- 배열에 특정 값을 모두 더하기
- 배열에 특정 값을 모두 빼기
를 구별하였다.
이 워커 인스턴스를 Promise 로 감싸서 실행하는 TestSharingMemory
함수는,
resolve
를 통해 전달된 정수 배열을 최종 결과로 출력하는 것이다.
Worker Thread
interface WorkerReceiveMessage {
type : string;
value : undefined | number;
}
function calculatePlusArr(arr : Int32Array, num : number) {
// 공유된 배열 메모리의 각 요소를 주어진 만큼 증가시킨다.
for(let i = 0; i < arr.length; i++) {
arr[i] = arr[i] + num;
}
}
function calculateMinusArr(arr : Int32Array, num : number) {
// 공유된 배열 메모리의 각 요소를 주어진 만큼 감소시킨다.
for(let i = 0; i < arr.length; i++) {
arr[i] = arr[i] - num;
}
}
// 워커 데이터로 공유 메모리를 참조한다.
const int32arr : Int32Array = workerData;
parentPort.on('message', (msg : WorkerReceiveMessage) => {
switch (msg.type) {
case "plus":
parentPort.postMessage("플러스 계산 수행 시작 : " + msg.value);
calculatePlusArr(int32arr, msg.value);
parentPort.postMessage({
type : "done",
value : int32arr.toString()
})
break;
case "minus":
parentPort.postMessage("마이너스 계산 수행 : " + msg.value);
calculateMinusArr(int32arr, msg.value);
parentPort.postMessage({
type : "done",
value : int32arr.toString()
});
break;
case "close":
console.log("메인 스레드에서 종료 메세지가 왔으니, 상태를 확인하고 반환.");
// 확인할 상태가 있다면, 같이 전달.
parentPort.postMessage({
type : "close"
});
break;
}
})
Main Thread 에서 자신이 받을 커스텀 이벤트를
message
이벤트에 정의된 전달 객체 내부 type
으로 정의했듯이,
워커 프로세스도 기본적으로 message
이벤트를 받되,
내부 type
에 따라, 서로 다른 함수를 실행하여 공통 배열 메모리 데이터를 변형하거나,
메인 스레드에서 자신을 종료하기 위해 닫는다고 메세징 할 때,
뒤처리를 할 수 있도록 제작하였다.
switch
문을 사용하니, 좀 더 간결해 보여 사용했다.
위의 코드는 외부에서 특정 숫자 배열을 넣고,
해당 결과를 확인할 수 있는 방식으로 제작되었다.
그렇다면, 만약에, 해당 워커를 생성 해 놓고,
종료하지 않으며 재사용 하려면 어떻게 해야 할까?
바로 resolve
를 배열의 결과가 아닌, worker
로 하면 된다.
Main Thread
function TestSharingMemory(arr : number[]) {
return new Promise<Worker> ((resolve, reject) => {
// 4Byte Integer Array 가정.
const sharedArrayBuffer = new SharedArrayBuffer (4 * arr.length);
// int 배열로 사용할 수 있도록 제작.
const int32Array = new Int32Array (sharedArrayBuffer);
// 들어온 변수를 공유 배열 메모리에 할당한다.
for (let i = 0; i < arr.length; i++) {
int32Array[i] = arr[i];
}
// 배열 계산 과정을 수행할 워커 생성(초기화)
const worker = new Worker (__filename, {
workerData: int32Array
});
// 워커에게 이 에러를 받으면, 이러한 행동을 한다고 선언.
worker.on ('error', async (err) => {
reject (err);
await worker.terminate ();
})
// 워커 스레드로부터 ".postMessage(..)" 를 받았을 때 행동한다고 선언.
worker.on ('message', async (message: MainReceiveMessage) => {
if (message.type === 'done') {
console.log ("지정된 워커의 계산이 끝남.")
console.log (message.value);
} else if (message.type === 'close') {
await worker.terminate ();
} else {
console.log (`워커 스레드가 메세지를 전해옴 : ${message}`);
}
})
resolve (worker);
})
}
TestSharingMemory([1, 2, 3, 4, 5]).then((worker) => {
worker.postMessage({
type : 'plus',
value : 10
});
worker.postMessage({
type : 'minus',
value : 20
});
worker.postMessage({
type : 'close'
});
})
Result :
➜ worker-exam tsc
➜ worker-exam node node-exam-6.js
워커 스레드가 메세지를 전해옴 : 플러스 계산 수행 시작 : 10
지정된 워커의 계산이 끝남.
11,12,13,14,15
워커 스레드가 메세지를 전해옴 : 마이너스 계산 수행 : 20
지정된 워커의 계산이 끝남.
-9,-8,-7,-6,-5
메인 스레드에서 종료 메세지가 왔으니, 상태를 확인하고 반환.
외부에서 "직접" 생성된 워커의 종료 시기를 정할 수 있으므로,
이는 즉 워커 풀을 생성하고 보존할 수 있다는 이야기이다.
후기
이 글을 작성하는 데, 3 일이 걸렸다.
싱글 스레드인 Node.js 가 어떻게 멀티 스레드로 운용될 수 있는지에 대한 공부,
그리고 공유 컨텍스트는 무엇으로 생성되는 지에 대해서 공부했다. (EX Atomic
)
이를 통해서, C 언어의 allocate
문법과 비슷하게 데이터 바이트 배열을 생성했다.
자율 문법을 추구하는 Node.js 에서 할당 기법을 사용 할 정도로 공부했으니, 나도 꽤 만족했다.
그리고 가장 중요한 것 중 하나는, Node.js 도 스레드 풀을 가질 수 있다는 것이다.
Node.js 의 스레드 생성 방식을 익히기 전까지는,
Docker 의 컨테이너 마다 하나의 스레드만을 할당하여, 비효율적이라고 생각했다.
그도 그럴 것이, 아무리 Event Driven I/O 방식이라지만, 결국엔 1 개의 스레드이기 때문이다.
심지어 원래 빠른 Java 는 멀티 쓰레드를 통해 연결 I/O 를 담당하는 스레드가 미리 준비되어 있었다.
하지만, 이번에 Node.js 환경에서의 멀티 스레딩 및 병렬 방식을 배우게 되면서,
프레임워크가 해결 해 주지 못하는 CPU 집약 계산을 최적화하는 법을 배우게 되었다.
다음 포스팅은 웹 어셈블리 (Web Assembly) 이다.
나는 Node.js 가 백엔드로서의 위상이 낮을 수 밖에 없다는 것을 깨닫았다.
아무리 편하고 Spring 스럽지만, 결국 속도로 인해 성장의 제한은 걸려있었다.
하지만, 멀티 스레딩에, 웹 어셈블리까지 조합한다면, 더 빨라지지 않을까? 생각한다.
참고 사이트
https://morsoftware.com/blog/fastest-programming-languages
https://nodejs.org/docs/latest/api/worker_threads.html
https://nodejs.org/docs/latest/api/cluster.html
https://developer.mozilla.org/ko/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer
'Node.js > 잡다 지식' 카테고리의 다른 글
NestJS 의 Interceptor 는 무엇일까? - (NestInterceptor) (0) | 2025.04.12 |
---|---|
WebAssembly 와 Node.js (0) | 2025.04.08 |
package.json 과 package-lock.json 은 무엇일까? (0) | 2025.04.03 |
JavaScript, TypeScript 에서 진짜 Private 구현하기 (1) | 2025.03.31 |
tsconfig 옵션 의미 (with NestJS) (0) | 2025.03.28 |