在 AWS Lambda 中使用 sequelize
AWS Lambda 是一種無伺服器運算服務,可讓客戶執行程式碼,而無需擔心底層伺服器。如果某些概念未正確理解且未使用適當的組態,在 AWS Lambda 中使用 sequelize
可能會很棘手。本指南旨在闡明其中一些概念,以便該函式庫的使用者可以正確設定 sequelize
以用於 AWS Lambda 並排除問題。
重點總結
如果您只想學習如何正確設定 sequelize
的 連線池 以用於 AWS Lambda,您只需要知道 sequelize
連線池與 AWS Lambda 的 Node.js 執行環境不相容,而且最終會造成的問題多於它能解決的問題。因此,最合適的組態是在同一次調用中使用池化並避免跨調用池化(即在結束時關閉所有連線)
const { Sequelize } = require("sequelize");
let sequelize = null;
async function loadSequelize() {
const sequelize = new Sequelize(/* (...) */, {
// (...)
pool: {
/*
* Lambda functions process one request at a time but your code may issue multiple queries
* concurrently. Be wary that `sequelize` has methods that issue 2 queries concurrently
* (e.g. `Model.findAndCountAll()`). Using a value higher than 1 allows concurrent queries to
* be executed in parallel rather than serialized. Careful with executing too many queries in
* parallel per Lambda function execution since that can bring down your database with an
* excessive number of connections.
*
* Ideally you want to choose a `max` number where this holds true:
* max * EXPECTED_MAX_CONCURRENT_LAMBDA_INVOCATIONS < MAX_ALLOWED_DATABASE_CONNECTIONS * 0.8
*/
max: 2,
/*
* Set this value to 0 so connection pool eviction logic eventually cleans up all connections
* in the event of a Lambda function timeout.
*/
min: 0,
/*
* Set this value to 0 so connections are eligible for cleanup immediately after they're
* returned to the pool.
*/
idle: 0,
// Choose a small enough value that fails fast if a connection takes too long to be established.
acquire: 3000,
/*
* Ensures the connection pool attempts to be cleaned up automatically on the next Lambda
* function invocation, if the previous invocation timed out.
*/
evict: CURRENT_LAMBDA_FUNCTION_TIMEOUT
}
});
// or `sequelize.sync()`
await sequelize.authenticate();
return sequelize;
}
module.exports.handler = async function (event, callback) {
// re-use the sequelize instance across invocations to improve performance
if (!sequelize) {
sequelize = await loadSequelize();
} else {
// restart connection pool to ensure connections are not re-used across invocations
sequelize.connectionManager.initPools();
// restore `getConnection()` if it has been overwritten by `close()`
if (sequelize.connectionManager.hasOwnProperty("getConnection")) {
delete sequelize.connectionManager.getConnection;
}
}
try {
return await doSomethingWithSequelize(sequelize);
} finally {
// close any opened connections during the invocation
// this will wait for any in-progress queries to finish before closing the connections
await sequelize.connectionManager.close();
}
};
使用 AWS RDS Proxy
如果您正在使用 AWS RDS,並且您正在使用 Aurora 或 支援的資料庫引擎,則請使用 AWS RDS Proxy 連線至您的資料庫。這將確保在每次調用時開啟/關閉連線對於您的基礎資料庫伺服器來說不是昂貴的操作。
如果您想了解為什麼您必須在 AWS Lambda 中以這種方式使用 sequelize,請繼續閱讀本文件的其餘部分
Node.js 事件循環
讓 Node.js 能夠執行非阻塞 I/O 操作的原因 — 儘管 JavaScript 是單執行緒的 —
雖然事件循環的實作是在 C++ 中,但這是一個簡化的 JavaScript 偽實作,說明 Node.js 如何執行名為 index.js
的指令碼
// see: https://node.dev.org.tw/en/docs/guides/event-loop-timers-and-nexttick/
// see: https://www.youtube.com/watch?v=P9csgxBgaZ8
// see: https://www.youtube.com/watch?v=PNa9OMajw9w
const process = require('process');
/*
* counter of pending events
*
* reference counter is increased for every:
*
* 1. scheduled timer: `setTimeout()`, `setInterval()`, etc.
* 2. scheduled immediate: `setImmediate()`.
* 3. syscall of non-blocking IO: `require('net').Server.listen()`, etc.
* 4. scheduled task to the thread pool: `require('fs').WriteStream.write()`, etc.
*
* reference counter is decreased for every:
*
* 1. elapsed timer
* 2. executed immediate
* 3. completed non-blocking IO
* 4. completed thread pool task
*
* references can be explicitly decreased by invoking `.unref()` on some
* objects like: `require('net').Socket.unref()`
*/
let refs = 0;
/*
* a heap of timers, sorted by next ocurrence
*
* whenever `setTimeout()` or `setInterval()` is invoked, a timer gets added here
*/
const timersHeap = /* (...) */;
/*
* a FIFO queue of immediates
*
* whenever `setImmediate()` is invoked, it gets added here
*/
const immediates = /* (...) */;
/*
* a FIFO queue of next tick callbacks
*
* whenever `require('process').nextTick()` is invoked, the callback gets added here
*/
const nextTickCallbacks = [];
/*
* a heap of Promise-related callbacks, sorted by promise constructors callbacks first,
* and then resolved/rejected callbacks
*
* whenever a new Promise instance is created via `new Promise` or a promise resolves/rejects
* the appropriate callback (if any) gets added here
*/
const promiseCallbacksHeap = /* ... */;
function execTicksAndPromises() {
while (nextTickCallbacks.length || promiseCallbacksHeap.size()) {
// execute all callbacks scheduled with `process.nextTick()`
while (nextTickCallbacks.length) {
const callback = nextTickCallbacks.shift();
callback();
}
// execute all promise-related callbacks
while (promiseCallbacksHeap.size()) {
const callback = promiseCallbacksHeap.pop();
callback();
}
}
}
try {
// execute index.js
require('./index');
execTicksAndPromises();
do {
// timers phase: executes all elapsed timers
getElapsedTimerCallbacks(timersHeap).forEach(callback => {
callback();
execTicksAndPromises();
});
// pending callbacks phase: executes some system operations (like `TCP errors`) that are not
// executed in the poll phase
getPendingCallbacks().forEach(callback => {
callback();
execTicksAndPromises();
})
// poll phase: gets completed non-blocking I/O events or thread pool tasks and invokes the
// corresponding callbacks; if there are none and there's no pending immediates,
// it blocks waiting for events/completed tasks for a maximum of `maxWait`
const maxWait = computeWhenNextTimerElapses(timersHeap);
pollForEventsFromKernelOrThreadPool(maxWait, immediates).forEach(callback => {
callback();
execTicksAndPromises();
});
// check phase: execute available immediates; if an immediate callback invokes `setImmediate()`
// it will be invoked on the next event loop iteration
getImmediateCallbacks(immediates).forEach(callback => {
callback();
execTicksAndPromises();
});
// close callbacks phase: execute special `.on('close')` callbacks
getCloseCallbacks().forEach(callback => {
callback();
execTicksAndPromises();
});
if (refs === 0) {
// listeners of this event may execute code that increments `refs`
process.emit('beforeExit');
}
} while (refs > 0);
} catch (err) {
if (!process.listenerCount('uncaughtException')) {
// default behavior: print stack and exit with status code 1
console.error(err.stack);
process.exit(1);
} else {
// there are listeners: emit the event and exit using `process.exitCode || 0`
process.emit('uncaughtException');
process.exit();
}
}
Node.js 中的 AWS Lambda 函式處理常式類型
AWS Lambda 處理常式在 Node.js 中有兩種形式
非非同步處理常式(即 callback
)
module.exports.handler = function (event, context, callback) {
try {
doSomething();
callback(null, 'Hello World!'); // Lambda returns "Hello World!"
} catch (err) {
// try/catch is not required, uncaught exceptions invoke `callback(err)` implicitly
callback(err); // Lambda fails with `err`
}
};
非同步處理常式(即使用 async
/await
或 Promise
)
// async/await
module.exports.handler = async function (event, context) {
try {
await doSomethingAsync();
return 'Hello World!'; // equivalent of: callback(null, "Hello World!");
} catch (err) {
// try/cath is not required, async functions always return a Promise
throw err; // equivalent of: callback(err);
}
};
// Promise
module.exports.handler = function (event, context) {
/*
* must return a `Promise` to be considered an async handler
*
* an uncaught exception that prevents a `Promise` to be returned
* by the handler will "downgrade" the handler to non-async
*/
return Promise.resolve()
.then(() => doSomethingAsync())
.then(() => 'Hello World!');
};
雖然乍看之下,非同步與非非同步處理常式似乎只是程式碼樣式選擇,但兩者之間存在根本差異
- 在非同步處理常式中,Lambda 函式的執行會在處理常式傳回的
Promise
解析或拒絕時完成,無論事件循環是否為空。 - 在非非同步處理常式中,Lambda 函式的執行會在下列其中一個條件發生時完成
- 事件循環為空(process
'beforeExit'
事件用於偵測此情況)。 - 調用
callback
引數,且context.callbackWaitsForEmptyEventLoop
設定為false
。
- 事件循環為空(process
要理解 sequelize
如何受到其影響,了解這個根本差異非常重要。以下是一些範例來說明差異
// no callback invoked
module.exports.handler = function () {
// Lambda finishes AFTER `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
};
// callback invoked
module.exports.handler = function (event, context, callback) {
// Lambda finishes AFTER `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
callback(null, 'Hello World!');
};
// callback invoked, context.callbackWaitsForEmptyEventLoop = false
module.exports.handler = function (event, context, callback) {
// Lambda finishes BEFORE `doSomething()` is invoked
context.callbackWaitsForEmptyEventLoop = false;
setTimeout(() => doSomething(), 2000);
setTimeout(() => callback(null, 'Hello World!'), 1000);
};
// async/await
module.exports.handler = async function () {
// Lambda finishes BEFORE `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
return 'Hello World!';
};
// Promise
module.exports.handler = function () {
// Lambda finishes BEFORE `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
return Promise.resolve('Hello World!');
};
AWS Lambda 執行環境(即容器)
AWS Lambda 函式處理常式由內建或自訂的 執行環境調用,這些執行環境(即容器)在 可能或可能不會跨調用重複使用。容器一次只能處理 一個請求。Lambda 函式的並行調用表示將為每個並行請求建立容器實例。
實際上,這表示 Lambda 函式應設計為無狀態,但開發人員可以使用狀態來進行快取
let sequelize = null;
module.exports.handler = async function () {
/*
* sequelize will already be loaded if the container is re-used
*
* containers are never re-used when a Lambda function's code change
*
* while the time elapsed between Lambda invocations is used as a factor to determine whether
* a container is re-used, no assumptions should be made of when a container is actually re-used
*
* AWS does not publicly document the rules of container re-use "by design" since containers
* can be recycled in response to internal AWS Lambda events (e.g. a Lambda function container
* may be recycled even if the function is constanly invoked)
*/
if (!sequelize) {
sequelize = await loadSequelize();
}
return await doSomethingWithSequelize(sequelize);
};
當 Lambda 函式不等待事件循環為空且重複使用容器時,事件循環將「暫停」直到下一次調用發生。例如
let counter = 0;
module.exports.handler = function (event, context, callback) {
/*
* The first invocation (i.e. container initialized) will:
* - log:
* - Fast timeout invoked. Request id: 00000000-0000-0000-0000-000000000000 | Elapsed ms: 5XX
* - return: 1
*
* Wait 3 seconds and invoke the Lambda again. The invocation (i.e. container re-used) will:
* - log:
* - Slow timeout invoked. Request id: 00000000-0000-0000-0000-000000000000 | Elapsed ms: 3XXX
* - Fast timeout invoked. Request id: 11111111-1111-1111-1111-111111111111 | Elapsed ms: 5XX
* - return: 3
*/
const now = Date.now();
context.callbackWaitsForEmptyEventLoop = false;
setTimeout(() => {
console.log(
'Slow timeout invoked. Request id:',
context.awsRequestId,
'| Elapsed ms:',
Date.now() - now,
);
counter++;
}, 1000);
setTimeout(() => {
console.log(
'Fast timeout invoked. Request id:',
context.awsRequestId,
'| Elapsed ms:',
Date.now() - now,
);
counter++;
callback(null, counter);
}, 500);
};
AWS Lambda 中的 Sequelize 連線池
sequelize
使用連線池來最佳化資料庫連線的使用。sequelize
使用的連線池是使用 setTimeout()
回呼(由 Node.js 事件循環處理)實作的。
鑑於 AWS Lambda 容器一次處理一個請求的事實,人們會很想如下設定 sequelize
const { Sequelize } = require('sequelize');
const sequelize = new Sequelize(/* (...) */, {
// (...)
pool: { min: 1, max: 1 }
});
此設定可防止 Lambda 容器使用過多的連線來淹沒資料庫伺服器(因為每個容器最多使用 1 個連線)。它還可以確保容器的連線在閒置時不會被垃圾回收,因此當重新使用 Lambda 容器時,不需要重新建立連線。遺憾的是,此設定會出現一組問題
- 等待事件循環為空的 Lambda 將永遠逾時。
sequelize
連線池會每options.pool.evict
毫秒排程一次setTimeout
,直到所有閒置連線都已關閉。但是,由於min
設定為1
,池中將始終至少有一個閒置連線,從而導致無限事件循環。 - 某些操作,例如
Model.findAndCountAll()
會非同步執行多個查詢(例如Model.count()
和Model.findAll()
)。使用最多一個連線會強制依序執行查詢(而不是使用兩個連線並行執行)。雖然為了維持可管理的資料庫連線數量,這可能是一個可以接受的效能折衷方案,但如果查詢完成所需的時間超過預設或設定的options.pool.acquire
超時時間,則長時間執行的查詢可能會導致ConnectionAcquireTimeoutError
。這是因為序列化查詢會卡在池中,直到釋放其他查詢使用的連線。 - 如果 AWS Lambda 函式逾時(即超過設定的 AWS Lambda 超時時間),無論其狀態如何,Node.js 事件循環都將會「暫停」。這可能會導致競爭條件,從而導致連線錯誤。例如,您可能會遇到以下情況:非常昂貴的查詢導致 Lambda 函式逾時,在昂貴的查詢完成且連線釋放回池之前,「暫停」事件循環,並且如果重新使用容器且在
options.pool.acquire
毫秒後未傳回連線,則後續的 Lambda 調用會失敗並出現ConnectionAcquireTimeoutError
。
您可以嘗試使用 { min: 1, max: 2 }
來緩解問題 #2。但是,這仍然會受到問題 #1 和 #3 的影響,同時還會引入其他問題
- 可能會發生競爭條件,其中事件循環會在連線池逐出回呼執行之前「暫停」,或在 Lambda 調用之間經過的時間超過
options.pool.evict
。這可能會導致逾時錯誤、交握錯誤和其他與連線相關的錯誤。 - 如果您使用類似
Model.findAndCountAll()
的操作,並且底層的Model.count()
或Model.findAll()
查詢失敗,您將無法確保在 Lambda 函式執行完成且事件循環「暫停」之前,已完成執行另一個查詢(並且將連線放回池中)。這可能會讓連線處於陳舊狀態,進而導致 TCP 連線過早關閉和其他與連線相關的錯誤。
使用 { min: 2, max: 2 }
可緩解其他問題 #1。但是,此設定仍然會受到所有其他問題(原始的 #1、#3 和其他 #2)的影響。
詳細的競爭條件範例
為了理解範例,您需要更多關於 Lambda 和 sequelize
的某些部分如何實作的背景資訊。
用於 nodejs.12x
的內建 AWS Lambda 執行環境是以 Node.js 實作的。您可以透過讀取 Node.js Lambda 函式內 /var/runtime/
的內容來存取整個執行環境的原始碼。相關的程式碼子集如下所示:
runtime/Runtime.js
class Runtime {
// (...)
// each iteration is executed in the event loop `check` phase
scheduleIteration() {
setImmediate(() => this.handleOnce().then(/* (...) */));
}
async handleOnce() {
// get next invocation. see: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-next
let { bodyJson, headers } = await this.client.nextInvocation();
// prepare `context` handler parameter
let invokeContext = new InvokeContext(headers);
invokeContext.updateLoggingContext();
// prepare `callback` handler parameter
let [callback, callbackContext] = CallbackContext.build(
this.client,
invokeContext.invokeId,
this.scheduleIteration.bind(this),
);
try {
// this listener is subscribed to process.on('beforeExit')
// so that when when `context.callbackWaitsForEmptyEventLoop === true`
// the Lambda execution finishes after the event loop is empty
this._setDefaultExitListener(invokeContext.invokeId);
// execute handler
const result = this.handler(
JSON.parse(bodyJson),
invokeContext.attachEnvironmentData(callbackContext),
callback,
);
// finish the execution if the handler is async
if (_isPromise(result)) {
result.then(callbackContext.succeed, callbackContext.fail).catch(callbackContext.fail);
}
} catch (err) {
callback(err);
}
}
}
執行環境會在初始化程式碼的結尾排定一次迭代。
runtime/index.js
// (...)
new Runtime(client, handler, errorCallbacks).scheduleIteration();
Lambda 處理常式使用 sequelize
呼叫的所有 SQL 查詢,最終都會使用 Sequelize.prototype.query() 執行。這個方法負責從連線池取得連線、執行查詢,以及在查詢完成後將連線釋放回連線池。以下程式碼片段顯示了沒有交易的查詢方法邏輯的簡化版本:
sequelize.js
class Sequelize {
// (...)
query(sql, options) {
// (...)
const connection = await this.connectionManager.getConnection(options);
const query = new this.dialect.Query(connection, this, options);
try {
return await query.run(sql, bindParameters);
} finally {
await this.connectionManager.releaseConnection(connection);
}
}
}
欄位 this.connectionManager
是特定方言的 ConnectionManager
類別的實例。所有特定方言的管理程式都繼承自一個抽象的 ConnectionManager
類別,該類別初始化連線池,並將其配置為每次需要建立新連線時呼叫特定方言類別的 connect()
方法。以下程式碼片段顯示了 mysql
方言 connect()
方法的簡化版本:
mysql/connection-manager.js
class ConnectionManager {
// (...)
async connect(config) {
// (...)
return await new Promise((resolve, reject) => {
// uses mysql2's `new Connection()`
const connection = this.lib.createConnection(connectionConfig);
const errorHandler = e => {
connection.removeListener('connect', connectHandler);
connection.removeListener('error', connectHandler);
reject(e);
};
const connectHandler = () => {
connection.removeListener('error', errorHandler);
resolve(connection);
};
connection.on('error', errorHandler);
connection.once('connect', connectHandler);
});
}
}
欄位 this.lib
指的是 mysql2
,而函式 createConnection()
則透過建立 Connection
類別的實例來建立連線。這個類別的相關子集如下所示:
mysql2/connection.js
class Connection extends EventEmitter {
constructor(opts) {
// (...)
// create Socket
this.stream = /* (...) */;
// when data is received, clear timeout
this.stream.on('data', data => {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this.packetParser.execute(data);
});
// (...)
// when handshake is completed, emit the 'connect' event
handshakeCommand.on('end', () => {
this.emit('connect', handshakeCommand.handshake);
});
// set a timeout to trigger if no data is received on the socket
if (this.config.connectTimeout) {
const timeoutHandler = this._handleTimeoutError.bind(this);
this.connectTimeout = Timers.setTimeout(
timeoutHandler,
this.config.connectTimeout
);
}
}
// (...)
_handleTimeoutError() {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this.stream.destroy && this.stream.destroy();
const err = new Error('connect ETIMEDOUT');
err.errorno = 'ETIMEDOUT';
err.code = 'ETIMEDOUT';
err.syscall = 'connect';
// this will emit the 'error' event
this._handleNetworkError(err);
}
}
基於先前的程式碼,以下事件順序顯示了當 { min: 1, max: 1 }
時,連線池的競爭條件如何導致 ETIMEDOUT
錯誤:
- 收到 Lambda 呼叫(新的容器)。
- 事件迴圈進入
check
階段,並呼叫runtime/Runtime.js
的handleOnce()
方法。handleOnce()
方法呼叫await this.client.nextInvocation()
並等待。
- 事件迴圈跳過
timers
階段,因為沒有待處理的計時器。 - 事件迴圈進入
poll
階段,handleOnce()
方法繼續執行。 - 呼叫 Lambda 處理常式。
- Lambda 處理常式呼叫
Model.count()
,這會呼叫sequelize.js
的query()
,然後呼叫connectionManager.getConnection()
。 - 連線池為
Model.count()
初始化setTimeout(..., config.pool.acquire)
,並呼叫mysql/connection-manager.js
的connect()
來建立新的連線。 mysql2/connection.js
建立 TCP socket,並初始化一個setTimeout()
,以便用ETIMEDOUT
錯誤使連線失敗。- 處理常式傳回的 promise 被拒絕(原因在此處不詳述),因此 Lambda 函式執行完成,並且 Node.js 事件迴圈「暫停」。
- 事件迴圈進入
- 在呼叫之間經過足夠的時間,因此:
config.pool.acquire
計時器經過。mysql2
連線計時器尚未經過,但幾乎已經過(即競爭條件)。
- 收到第二個 Lambda 呼叫(容器重複使用)。
- 事件迴圈「恢復」。
- 事件迴圈進入
check
階段,並呼叫runtime/Runtime.js
的handleOnce()
方法。 - 事件迴圈進入
timers
階段,config.pool.acquire
計時器經過,導致先前呼叫的Model.count()
promise 因ConnectionAcquireTimeoutError
而被拒絕。 - 事件迴圈進入
poll
階段,handleOnce()
方法繼續執行。 - 呼叫 Lambda 處理常式。
- Lambda 處理常式呼叫
Model.count()
,這會呼叫sequelize.js
的query()
,然後呼叫connectionManager.getConnection()
。 - 連線池為
Model.count()
初始化setTimeout(..., config.pool.acquire)
,並且由於{ max : 1 }
,它會等待待處理的connect()
promise 完成。 - 事件迴圈跳過
check
階段,因為沒有待處理的立即執行事項。 - 競爭條件:事件迴圈進入
timers
階段,mysql2
連線逾時經過,導致ETIMEDOUT
錯誤,並使用connection.emit('error')
發出。 - 發出的事件拒絕了
mysql/connection-manager.js
的connect()
中的 promise,而這反過來又將被拒絕的 promise 轉發給Model.count()
查詢的 promise。 - Lambda 函式因
ETIMEDOUT
錯誤而失敗。