跳至主要內容
版本:v6 - 穩定版

在 AWS Lambda 中使用 sequelize

AWS Lambda 是一種無伺服器運算服務,可讓客戶執行程式碼,而無需擔心底層伺服器。如果某些概念未正確理解且未使用適當的組態,在 AWS Lambda 中使用 sequelize 可能會很棘手。本指南旨在闡明其中一些概念,以便該函式庫的使用者可以正確設定 sequelize 以用於 AWS Lambda 並排除問題。

重點總結

如果您只想學習如何正確設定 sequelize連線池 以用於 AWS Lambda,您只需要知道 sequelize 連線池與 AWS Lambda 的 Node.js 執行環境不相容,而且最終會造成的問題多於它能解決的問題。因此,最合適的組態是在同一次調用中使用池化避免跨調用池化(即在結束時關閉所有連線)

const { Sequelize } = require("sequelize");

let sequelize = null;

async function loadSequelize() {
const sequelize = new Sequelize(/* (...) */, {
// (...)
pool: {
/*
* Lambda functions process one request at a time but your code may issue multiple queries
* concurrently. Be wary that `sequelize` has methods that issue 2 queries concurrently
* (e.g. `Model.findAndCountAll()`). Using a value higher than 1 allows concurrent queries to
* be executed in parallel rather than serialized. Careful with executing too many queries in
* parallel per Lambda function execution since that can bring down your database with an
* excessive number of connections.
*
* Ideally you want to choose a `max` number where this holds true:
* max * EXPECTED_MAX_CONCURRENT_LAMBDA_INVOCATIONS < MAX_ALLOWED_DATABASE_CONNECTIONS * 0.8
*/
max: 2,
/*
* Set this value to 0 so connection pool eviction logic eventually cleans up all connections
* in the event of a Lambda function timeout.
*/
min: 0,
/*
* Set this value to 0 so connections are eligible for cleanup immediately after they're
* returned to the pool.
*/
idle: 0,
// Choose a small enough value that fails fast if a connection takes too long to be established.
acquire: 3000,
/*
* Ensures the connection pool attempts to be cleaned up automatically on the next Lambda
* function invocation, if the previous invocation timed out.
*/
evict: CURRENT_LAMBDA_FUNCTION_TIMEOUT
}
});

// or `sequelize.sync()`
await sequelize.authenticate();

return sequelize;
}

module.exports.handler = async function (event, callback) {
// re-use the sequelize instance across invocations to improve performance
if (!sequelize) {
sequelize = await loadSequelize();
} else {
// restart connection pool to ensure connections are not re-used across invocations
sequelize.connectionManager.initPools();

// restore `getConnection()` if it has been overwritten by `close()`
if (sequelize.connectionManager.hasOwnProperty("getConnection")) {
delete sequelize.connectionManager.getConnection;
}
}

try {
return await doSomethingWithSequelize(sequelize);
} finally {
// close any opened connections during the invocation
// this will wait for any in-progress queries to finish before closing the connections
await sequelize.connectionManager.close();
}
};

使用 AWS RDS Proxy

如果您正在使用 AWS RDS,並且您正在使用 Aurora支援的資料庫引擎,則請使用 AWS RDS Proxy 連線至您的資料庫。這將確保在每次調用時開啟/關閉連線對於您的基礎資料庫伺服器來說不是昂貴的操作。


如果您想了解為什麼您必須在 AWS Lambda 中以這種方式使用 sequelize,請繼續閱讀本文件的其餘部分

Node.js 事件循環

Node.js 事件循環

讓 Node.js 能夠執行非阻塞 I/O 操作的原因 — 儘管 JavaScript 是單執行緒的 —

雖然事件循環的實作是在 C++ 中,但這是一個簡化的 JavaScript 偽實作,說明 Node.js 如何執行名為 index.js 的指令碼

// see: https://node.dev.org.tw/en/docs/guides/event-loop-timers-and-nexttick/
// see: https://www.youtube.com/watch?v=P9csgxBgaZ8
// see: https://www.youtube.com/watch?v=PNa9OMajw9w
const process = require('process');

/*
* counter of pending events
*
* reference counter is increased for every:
*
* 1. scheduled timer: `setTimeout()`, `setInterval()`, etc.
* 2. scheduled immediate: `setImmediate()`.
* 3. syscall of non-blocking IO: `require('net').Server.listen()`, etc.
* 4. scheduled task to the thread pool: `require('fs').WriteStream.write()`, etc.
*
* reference counter is decreased for every:
*
* 1. elapsed timer
* 2. executed immediate
* 3. completed non-blocking IO
* 4. completed thread pool task
*
* references can be explicitly decreased by invoking `.unref()` on some
* objects like: `require('net').Socket.unref()`
*/
let refs = 0;

/*
* a heap of timers, sorted by next ocurrence
*
* whenever `setTimeout()` or `setInterval()` is invoked, a timer gets added here
*/
const timersHeap = /* (...) */;

/*
* a FIFO queue of immediates
*
* whenever `setImmediate()` is invoked, it gets added here
*/
const immediates = /* (...) */;

/*
* a FIFO queue of next tick callbacks
*
* whenever `require('process').nextTick()` is invoked, the callback gets added here
*/
const nextTickCallbacks = [];

/*
* a heap of Promise-related callbacks, sorted by promise constructors callbacks first,
* and then resolved/rejected callbacks
*
* whenever a new Promise instance is created via `new Promise` or a promise resolves/rejects
* the appropriate callback (if any) gets added here
*/
const promiseCallbacksHeap = /* ... */;

function execTicksAndPromises() {
while (nextTickCallbacks.length || promiseCallbacksHeap.size()) {
// execute all callbacks scheduled with `process.nextTick()`
while (nextTickCallbacks.length) {
const callback = nextTickCallbacks.shift();
callback();
}

// execute all promise-related callbacks
while (promiseCallbacksHeap.size()) {
const callback = promiseCallbacksHeap.pop();
callback();
}
}
}

try {
// execute index.js
require('./index');
execTicksAndPromises();

do {
// timers phase: executes all elapsed timers
getElapsedTimerCallbacks(timersHeap).forEach(callback => {
callback();
execTicksAndPromises();
});

// pending callbacks phase: executes some system operations (like `TCP errors`) that are not
// executed in the poll phase
getPendingCallbacks().forEach(callback => {
callback();
execTicksAndPromises();
})

// poll phase: gets completed non-blocking I/O events or thread pool tasks and invokes the
// corresponding callbacks; if there are none and there's no pending immediates,
// it blocks waiting for events/completed tasks for a maximum of `maxWait`
const maxWait = computeWhenNextTimerElapses(timersHeap);
pollForEventsFromKernelOrThreadPool(maxWait, immediates).forEach(callback => {
callback();
execTicksAndPromises();
});

// check phase: execute available immediates; if an immediate callback invokes `setImmediate()`
// it will be invoked on the next event loop iteration
getImmediateCallbacks(immediates).forEach(callback => {
callback();
execTicksAndPromises();
});

// close callbacks phase: execute special `.on('close')` callbacks
getCloseCallbacks().forEach(callback => {
callback();
execTicksAndPromises();
});

if (refs === 0) {
// listeners of this event may execute code that increments `refs`
process.emit('beforeExit');
}
} while (refs > 0);
} catch (err) {
if (!process.listenerCount('uncaughtException')) {
// default behavior: print stack and exit with status code 1
console.error(err.stack);
process.exit(1);
} else {
// there are listeners: emit the event and exit using `process.exitCode || 0`
process.emit('uncaughtException');
process.exit();
}
}

Node.js 中的 AWS Lambda 函式處理常式類型

AWS Lambda 處理常式在 Node.js 中有兩種形式

非非同步處理常式(即 callback

module.exports.handler = function (event, context, callback) {
try {
doSomething();
callback(null, 'Hello World!'); // Lambda returns "Hello World!"
} catch (err) {
// try/catch is not required, uncaught exceptions invoke `callback(err)` implicitly
callback(err); // Lambda fails with `err`
}
};

非同步處理常式(即使用 async/awaitPromise

// async/await
module.exports.handler = async function (event, context) {
try {
await doSomethingAsync();
return 'Hello World!'; // equivalent of: callback(null, "Hello World!");
} catch (err) {
// try/cath is not required, async functions always return a Promise
throw err; // equivalent of: callback(err);
}
};

// Promise
module.exports.handler = function (event, context) {
/*
* must return a `Promise` to be considered an async handler
*
* an uncaught exception that prevents a `Promise` to be returned
* by the handler will "downgrade" the handler to non-async
*/
return Promise.resolve()
.then(() => doSomethingAsync())
.then(() => 'Hello World!');
};

雖然乍看之下,非同步與非非同步處理常式似乎只是程式碼樣式選擇,但兩者之間存在根本差異

  • 在非同步處理常式中,Lambda 函式的執行會在處理常式傳回的 Promise 解析或拒絕時完成,無論事件循環是否為空。
  • 在非非同步處理常式中,Lambda 函式的執行會在下列其中一個條件發生時完成

要理解 sequelize 如何受到其影響,了解這個根本差異非常重要。以下是一些範例來說明差異

// no callback invoked
module.exports.handler = function () {
// Lambda finishes AFTER `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
};

// callback invoked
module.exports.handler = function (event, context, callback) {
// Lambda finishes AFTER `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
callback(null, 'Hello World!');
};

// callback invoked, context.callbackWaitsForEmptyEventLoop = false
module.exports.handler = function (event, context, callback) {
// Lambda finishes BEFORE `doSomething()` is invoked
context.callbackWaitsForEmptyEventLoop = false;
setTimeout(() => doSomething(), 2000);
setTimeout(() => callback(null, 'Hello World!'), 1000);
};

// async/await
module.exports.handler = async function () {
// Lambda finishes BEFORE `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
return 'Hello World!';
};

// Promise
module.exports.handler = function () {
// Lambda finishes BEFORE `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
return Promise.resolve('Hello World!');
};

AWS Lambda 執行環境(即容器)

AWS Lambda 函式處理常式由內建或自訂的 執行環境調用,這些執行環境(即容器)在 可能或可能不會跨調用重複使用。容器一次只能處理 一個請求。Lambda 函式的並行調用表示將為每個並行請求建立容器實例。

實際上,這表示 Lambda 函式應設計為無狀態,但開發人員可以使用狀態來進行快取

let sequelize = null;

module.exports.handler = async function () {
/*
* sequelize will already be loaded if the container is re-used
*
* containers are never re-used when a Lambda function's code change
*
* while the time elapsed between Lambda invocations is used as a factor to determine whether
* a container is re-used, no assumptions should be made of when a container is actually re-used
*
* AWS does not publicly document the rules of container re-use "by design" since containers
* can be recycled in response to internal AWS Lambda events (e.g. a Lambda function container
* may be recycled even if the function is constanly invoked)
*/
if (!sequelize) {
sequelize = await loadSequelize();
}

return await doSomethingWithSequelize(sequelize);
};

當 Lambda 函式不等待事件循環為空且重複使用容器時,事件循環將「暫停」直到下一次調用發生。例如

let counter = 0;

module.exports.handler = function (event, context, callback) {
/*
* The first invocation (i.e. container initialized) will:
* - log:
* - Fast timeout invoked. Request id: 00000000-0000-0000-0000-000000000000 | Elapsed ms: 5XX
* - return: 1
*
* Wait 3 seconds and invoke the Lambda again. The invocation (i.e. container re-used) will:
* - log:
* - Slow timeout invoked. Request id: 00000000-0000-0000-0000-000000000000 | Elapsed ms: 3XXX
* - Fast timeout invoked. Request id: 11111111-1111-1111-1111-111111111111 | Elapsed ms: 5XX
* - return: 3
*/
const now = Date.now();

context.callbackWaitsForEmptyEventLoop = false;

setTimeout(() => {
console.log(
'Slow timeout invoked. Request id:',
context.awsRequestId,
'| Elapsed ms:',
Date.now() - now,
);
counter++;
}, 1000);

setTimeout(() => {
console.log(
'Fast timeout invoked. Request id:',
context.awsRequestId,
'| Elapsed ms:',
Date.now() - now,
);
counter++;
callback(null, counter);
}, 500);
};

AWS Lambda 中的 Sequelize 連線池

sequelize 使用連線池來最佳化資料庫連線的使用。sequelize 使用的連線池是使用 setTimeout() 回呼(由 Node.js 事件循環處理)實作的。

鑑於 AWS Lambda 容器一次處理一個請求的事實,人們會很想如下設定 sequelize

const { Sequelize } = require('sequelize');

const sequelize = new Sequelize(/* (...) */, {
// (...)
pool: { min: 1, max: 1 }
});

此設定可防止 Lambda 容器使用過多的連線來淹沒資料庫伺服器(因為每個容器最多使用 1 個連線)。它還可以確保容器的連線在閒置時不會被垃圾回收,因此當重新使用 Lambda 容器時,不需要重新建立連線。遺憾的是,此設定會出現一組問題

  1. 等待事件循環為空的 Lambda 將永遠逾時。sequelize 連線池會每 options.pool.evict 毫秒排程一次 setTimeout,直到所有閒置連線都已關閉。但是,由於 min 設定為 1,池中將始終至少有一個閒置連線,從而導致無限事件循環。
  2. 某些操作,例如 Model.findAndCountAll() 會非同步執行多個查詢(例如 Model.count()Model.findAll())。使用最多一個連線會強制依序執行查詢(而不是使用兩個連線並行執行)。雖然為了維持可管理的資料庫連線數量,這可能是一個可以接受的效能折衷方案,但如果查詢完成所需的時間超過預設或設定的 options.pool.acquire 超時時間,則長時間執行的查詢可能會導致 ConnectionAcquireTimeoutError。這是因為序列化查詢會卡在池中,直到釋放其他查詢使用的連線。
  3. 如果 AWS Lambda 函式逾時(即超過設定的 AWS Lambda 超時時間),無論其狀態如何,Node.js 事件循環都將會「暫停」。這可能會導致競爭條件,從而導致連線錯誤。例如,您可能會遇到以下情況:非常昂貴的查詢導致 Lambda 函式逾時,在昂貴的查詢完成且連線釋放回池之前,「暫停」事件循環,並且如果重新使用容器且在 options.pool.acquire 毫秒後未傳回連線,則後續的 Lambda 調用會失敗並出現 ConnectionAcquireTimeoutError

您可以嘗試使用 { min: 1, max: 2 } 來緩解問題 #2。但是,這仍然會受到問題 #1#3 的影響,同時還會引入其他問題

  1. 可能會發生競爭條件,其中事件循環會在連線池逐出回呼執行之前「暫停」,或在 Lambda 調用之間經過的時間超過 options.pool.evict。這可能會導致逾時錯誤、交握錯誤和其他與連線相關的錯誤。
  2. 如果您使用類似 Model.findAndCountAll() 的操作,並且底層的 Model.count()Model.findAll() 查詢失敗,您將無法確保在 Lambda 函式執行完成且事件循環「暫停」之前,已完成執行另一個查詢(並且將連線放回池中)。這可能會讓連線處於陳舊狀態,進而導致 TCP 連線過早關閉和其他與連線相關的錯誤。

使用 { min: 2, max: 2 } 可緩解其他問題 #1。但是,此設定仍然會受到所有其他問題(原始的 #1#3 和其他 #2)的影響。

詳細的競爭條件範例

為了理解範例,您需要更多關於 Lambda 和 sequelize 的某些部分如何實作的背景資訊。

用於 nodejs.12x 的內建 AWS Lambda 執行環境是以 Node.js 實作的。您可以透過讀取 Node.js Lambda 函式內 /var/runtime/ 的內容來存取整個執行環境的原始碼。相關的程式碼子集如下所示:

runtime/Runtime.js

class Runtime {
// (...)

// each iteration is executed in the event loop `check` phase
scheduleIteration() {
setImmediate(() => this.handleOnce().then(/* (...) */));
}

async handleOnce() {
// get next invocation. see: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-next
let { bodyJson, headers } = await this.client.nextInvocation();

// prepare `context` handler parameter
let invokeContext = new InvokeContext(headers);
invokeContext.updateLoggingContext();

// prepare `callback` handler parameter
let [callback, callbackContext] = CallbackContext.build(
this.client,
invokeContext.invokeId,
this.scheduleIteration.bind(this),
);

try {
// this listener is subscribed to process.on('beforeExit')
// so that when when `context.callbackWaitsForEmptyEventLoop === true`
// the Lambda execution finishes after the event loop is empty
this._setDefaultExitListener(invokeContext.invokeId);

// execute handler
const result = this.handler(
JSON.parse(bodyJson),
invokeContext.attachEnvironmentData(callbackContext),
callback,
);

// finish the execution if the handler is async
if (_isPromise(result)) {
result.then(callbackContext.succeed, callbackContext.fail).catch(callbackContext.fail);
}
} catch (err) {
callback(err);
}
}
}

執行環境會在初始化程式碼的結尾排定一次迭代。

runtime/index.js

// (...)

new Runtime(client, handler, errorCallbacks).scheduleIteration();

Lambda 處理常式使用 sequelize 呼叫的所有 SQL 查詢,最終都會使用 Sequelize.prototype.query() 執行。這個方法負責從連線池取得連線、執行查詢,以及在查詢完成後將連線釋放回連線池。以下程式碼片段顯示了沒有交易的查詢方法邏輯的簡化版本:

sequelize.js

class Sequelize {
// (...)

query(sql, options) {
// (...)

const connection = await this.connectionManager.getConnection(options);
const query = new this.dialect.Query(connection, this, options);

try {
return await query.run(sql, bindParameters);
} finally {
await this.connectionManager.releaseConnection(connection);
}
}
}

欄位 this.connectionManager 是特定方言的 ConnectionManager 類別的實例。所有特定方言的管理程式都繼承自一個抽象的 ConnectionManager 類別,該類別初始化連線池,並將其配置為每次需要建立新連線時呼叫特定方言類別的 connect() 方法。以下程式碼片段顯示了 mysql 方言 connect() 方法的簡化版本:

mysql/connection-manager.js

class ConnectionManager {
// (...)

async connect(config) {
// (...)
return await new Promise((resolve, reject) => {
// uses mysql2's `new Connection()`
const connection = this.lib.createConnection(connectionConfig);

const errorHandler = e => {
connection.removeListener('connect', connectHandler);
connection.removeListener('error', connectHandler);
reject(e);
};

const connectHandler = () => {
connection.removeListener('error', errorHandler);
resolve(connection);
};

connection.on('error', errorHandler);
connection.once('connect', connectHandler);
});
}
}

欄位 this.lib 指的是 mysql2,而函式 createConnection() 則透過建立 Connection 類別的實例來建立連線。這個類別的相關子集如下所示:

mysql2/connection.js

class Connection extends EventEmitter {
constructor(opts) {
// (...)

// create Socket
this.stream = /* (...) */;

// when data is received, clear timeout
this.stream.on('data', data => {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this.packetParser.execute(data);
});

// (...)

// when handshake is completed, emit the 'connect' event
handshakeCommand.on('end', () => {
this.emit('connect', handshakeCommand.handshake);
});

// set a timeout to trigger if no data is received on the socket
if (this.config.connectTimeout) {
const timeoutHandler = this._handleTimeoutError.bind(this);
this.connectTimeout = Timers.setTimeout(
timeoutHandler,
this.config.connectTimeout
);
}
}

// (...)

_handleTimeoutError() {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this.stream.destroy && this.stream.destroy();
const err = new Error('connect ETIMEDOUT');
err.errorno = 'ETIMEDOUT';
err.code = 'ETIMEDOUT';
err.syscall = 'connect';

// this will emit the 'error' event
this._handleNetworkError(err);
}
}

基於先前的程式碼,以下事件順序顯示了當 { min: 1, max: 1 } 時,連線池的競爭條件如何導致 ETIMEDOUT 錯誤:

  1. 收到 Lambda 呼叫(新的容器)。
    1. 事件迴圈進入 check 階段,並呼叫 runtime/Runtime.jshandleOnce() 方法。
      1. handleOnce() 方法呼叫 await this.client.nextInvocation() 並等待。
    2. 事件迴圈跳過 timers 階段,因為沒有待處理的計時器。
    3. 事件迴圈進入 poll 階段,handleOnce() 方法繼續執行。
    4. 呼叫 Lambda 處理常式。
    5. Lambda 處理常式呼叫 Model.count(),這會呼叫 sequelize.jsquery(),然後呼叫 connectionManager.getConnection()
    6. 連線池為 Model.count() 初始化 setTimeout(..., config.pool.acquire),並呼叫 mysql/connection-manager.jsconnect() 來建立新的連線。
    7. mysql2/connection.js 建立 TCP socket,並初始化一個 setTimeout(),以便用 ETIMEDOUT 錯誤使連線失敗。
    8. 處理常式傳回的 promise 被拒絕(原因在此處不詳述),因此 Lambda 函式執行完成,並且 Node.js 事件迴圈「暫停」。
  2. 在呼叫之間經過足夠的時間,因此:
    1. config.pool.acquire 計時器經過。
    2. mysql2 連線計時器尚未經過,但幾乎已經過(即競爭條件)。
  3. 收到第二個 Lambda 呼叫(容器重複使用)。
    1. 事件迴圈「恢復」。
    2. 事件迴圈進入 check 階段,並呼叫 runtime/Runtime.jshandleOnce() 方法。
    3. 事件迴圈進入 timers 階段,config.pool.acquire 計時器經過,導致先前呼叫的 Model.count() promise 因 ConnectionAcquireTimeoutError 而被拒絕。
    4. 事件迴圈進入 poll 階段,handleOnce() 方法繼續執行。
    5. 呼叫 Lambda 處理常式。
    6. Lambda 處理常式呼叫 Model.count(),這會呼叫 sequelize.jsquery(),然後呼叫 connectionManager.getConnection()
    7. 連線池為 Model.count() 初始化 setTimeout(..., config.pool.acquire),並且由於 { max : 1 },它會等待待處理的 connect() promise 完成。
    8. 事件迴圈跳過 check 階段,因為沒有待處理的立即執行事項。
    9. 競爭條件:事件迴圈進入 timers 階段,mysql2 連線逾時經過,導致 ETIMEDOUT 錯誤,並使用 connection.emit('error') 發出。
    10. 發出的事件拒絕了 mysql/connection-manager.jsconnect() 中的 promise,而這反過來又將被拒絕的 promise 轉發給 Model.count() 查詢的 promise。
    11. Lambda 函式因 ETIMEDOUT 錯誤而失敗。