diff --git a/docs/pages/apis/client.mdx b/docs/pages/apis/client.mdx index 5867ad5a6..8731c580a 100644 --- a/docs/pages/apis/client.mdx +++ b/docs/pages/apis/client.mdx @@ -56,6 +56,18 @@ const client = new Client() await client.connect() ``` +## client.pipelining + +`client.pipelining: boolean` + +When set to `true`, queries are sent to the server immediately without waiting for previous queries to complete. Defaults to `false`. See [Pipelining](/features/pipelining) for details and examples. + +```js +const client = new Client() +await client.connect() +client.pipelining = true +``` + ## client.query ### QueryConfig diff --git a/docs/pages/apis/pool.mdx b/docs/pages/apis/pool.mdx index 123bc8ba4..7a27e3e3a 100644 --- a/docs/pages/apis/pool.mdx +++ b/docs/pages/apis/pool.mdx @@ -69,6 +69,11 @@ type Config = { // If the function throws or returns a promise that rejects, the client is destroyed // and the error is returned to the caller requesting the connection. onConnect?: (client: Client) => void | Promise + + // When set to true, enables query pipelining on every client the pool creates. + // Pipelined clients send queries to the server without waiting for previous responses. + // Default is false. See /features/pipelining for details. + pipelining?: boolean } ``` diff --git a/docs/pages/features/_meta.js b/docs/pages/features/_meta.js index 62f1660ca..7ddd35a5c 100644 --- a/docs/pages/features/_meta.js +++ b/docs/pages/features/_meta.js @@ -1,6 +1,7 @@ export default { connecting: 'Connecting', queries: 'Queries', + pipelining: 'Pipelining', pooling: 'Pooling', transactions: 'Transactions', types: 'Data Types', diff --git a/docs/pages/features/pipelining.mdx b/docs/pages/features/pipelining.mdx new file mode 100644 index 000000000..11f452b9e --- /dev/null +++ b/docs/pages/features/pipelining.mdx @@ -0,0 +1,132 @@ +--- +title: Pipelining +--- + +import { Alert } from '/components/alert.tsx' + +## What is pipelining? + +By default node-postgres waits for each query to complete before sending the next one. This means every query pays a full network round-trip of latency. **Query pipelining** sends multiple queries to the server without waiting for responses, and the server processes them in order. Each query still gets its own result (or error), but you avoid the idle time between them. + +``` +sequential (default) pipelined +───────────────────── ───────────────────── + client ──Parse──▶ server client ──Parse──▶ server + client ◀──Ready── server ──Parse──▶ + client ──Parse──▶ server ──Parse──▶ + client ◀──Ready── server client ◀──Ready── server + client ──Parse──▶ server client ◀──Ready── server + client ◀──Ready── server client ◀──Ready── server +``` + +In benchmarks, pipelining typically delivers **2-3x throughput** for batches of simple queries on a local connection, with larger gains over higher-latency links. + +## Enabling pipelining + +Pipelining is opt-in. Set `client.pipelining = true` after connecting: + +```js +import { Client } from 'pg' + +const client = new Client() +await client.connect() +client.pipelining = true + +const [r1, r2, r3] = await Promise.all([ + client.query('SELECT 1 AS num'), + client.query('SELECT 2 AS num'), + client.query('SELECT 3 AS num'), +]) + +console.log(r1.rows[0].num, r2.rows[0].num, r3.rows[0].num) // 1 2 3 + +await client.end() +``` + +All query types work with pipelining: plain text, parameterized, and named prepared statements. + +## Pipelining with a pool + +Pass `pipelining: true` in the pool config to enable it on every client the pool creates: + +```js +import { Pool } from 'pg' + +const pool = new Pool({ pipelining: true }) + +const client = await pool.connect() +// client.pipelining is already true + +const [users, orders] = await Promise.all([ + client.query('SELECT * FROM users WHERE id = $1', [1]), + client.query('SELECT * FROM orders WHERE user_id = $1', [1]), +]) + +client.release() +``` + + +
+ pool.query() checks out a client for a single query and releases it immediately, so pipelining has no effect there. Use pool.connect() to check out a client and send multiple queries on it. +
+
+ +## Error isolation + +Each pipelined query gets its own error boundary. A failing query in the middle of a batch does not break the other queries: + +```js +const results = await Promise.allSettled([ + client.query('SELECT 1 AS num'), + client.query('SELECT INVALID SYNTAX'), + client.query('SELECT 3 AS num'), +]) + +console.log(results[0].status) // 'fulfilled' +console.log(results[1].status) // 'rejected' +console.log(results[2].status) // 'fulfilled' +``` + +This works because node-postgres sends a `Sync` message after each query, which is how PostgreSQL delimits error boundaries in the extended query protocol. + +## Named prepared statements + +Named prepared statements work with pipelining. When two pipelined queries share the same statement name, node-postgres sends `Parse` only once and reuses the prepared statement for subsequent queries: + +```js +const queries = Array.from({ length: 100 }, (_, i) => ({ + name: 'get-user', + text: 'SELECT * FROM users WHERE id = $1', + values: [i], +})) + +const results = await Promise.all(queries.map(q => client.query(q))) +``` + +## Graceful shutdown + +Calling `client.end()` while pipelined queries are in flight will wait for all of them to complete before closing the connection: + +```js +client.pipelining = true + +const p1 = client.query('SELECT 1') +const p2 = client.query('SELECT 2') +const endPromise = client.end() + +// Both queries will resolve normally +const [r1, r2] = await Promise.all([p1, p2]) +await endPromise +``` + +## When to use pipelining + +Pipelining is most useful when you have multiple **independent** queries that don't depend on each other's results. Common use cases: + +- Fetching data from multiple tables in parallel for a page load +- Inserting or updating multiple rows simultaneously +- Running a batch of analytics queries + +
+ Do not use pipelining inside a transaction if you need to read the result of one query before issuing the next. Pipelined queries are all sent before any responses arrive, so you cannot branch on intermediate results. For dependent queries within a transaction, use sequential await calls instead. +
diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index 2fbdb78d5..95d835c25 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -239,6 +239,9 @@ class Pool extends EventEmitter { newClient(pendingItem) { const client = new this.Client(this.options) + if (this.options.pipelining) { + client.pipelining = true + } this._clients.push(client) const idleListener = makeIdleListener(this, client) diff --git a/packages/pg-pool/test/index.js b/packages/pg-pool/test/index.js index 57a68e01e..9704e5ee1 100644 --- a/packages/pg-pool/test/index.js +++ b/packages/pg-pool/test/index.js @@ -203,6 +203,33 @@ describe('pool', function () { }) }) + it('enables pipelining on clients when configured', async function () { + const pool = new Pool({ pipelining: true }) + const client = await pool.connect() + expect(client.pipelining).to.be(true) + + const [r1, r2, r3] = await Promise.all([ + client.query('SELECT 1 AS num'), + client.query('SELECT 2 AS num'), + client.query('SELECT 3 AS num'), + ]) + + expect(r1.rows[0].num).to.eql(1) + expect(r2.rows[0].num).to.eql(2) + expect(r3.rows[0].num).to.eql(3) + + client.release() + return pool.end() + }) + + it('does not enable pipelining by default', async function () { + const pool = new Pool() + const client = await pool.connect() + expect(client.pipelining).to.be(false) + client.release() + return pool.end() + }) + it('recovers from query errors', function () { const pool = new Pool() diff --git a/packages/pg/bench-pipelining.js b/packages/pg/bench-pipelining.js new file mode 100644 index 000000000..4c949dbf3 --- /dev/null +++ b/packages/pg/bench-pipelining.js @@ -0,0 +1,102 @@ +'use strict' +const pg = require('./lib') + +// Queries to benchmark +const SIMPLE = { text: 'SELECT 1' } +const PARAM = { + text: 'SELECT $1::int AS n', + values: [42], +} +const NAMED = { + name: 'bench-named', + text: 'SELECT $1::int AS n', + values: [42], +} + +async function bench(label, fn, seconds) { + // warmup + for (let i = 0; i < 100; i++) await fn() + + const deadline = Date.now() + seconds * 1000 + let count = 0 + while (Date.now() < deadline) { + await fn() + count++ + } + const qps = (count / seconds).toFixed(0) + console.log(` ${label}: ${qps} qps (${count} queries in ${seconds}s)`) + return count / seconds +} + +async function benchPipelined(label, makeQueries, batchSize, seconds) { + const client = new pg.Client() + await client.connect() + client.pipelining = true + + // warmup + for (let i = 0; i < 10; i++) { + await Promise.all(makeQueries(batchSize).map((q) => client.query(q))) + } + + const deadline = Date.now() + seconds * 1000 + let count = 0 + while (Date.now() < deadline) { + const queries = makeQueries(batchSize) + await Promise.all(queries.map((q) => client.query(q))) + count += batchSize + } + const qps = (count / seconds).toFixed(0) + console.log(` ${label} (batch=${batchSize}): ${qps} qps`) + + await client.end() + return count / seconds +} + +async function runSerial(label, query, seconds) { + const client = new pg.Client() + await client.connect() + + const qps = await bench(label, () => client.query(query), seconds) + await client.end() + return qps +} + +async function run() { + const SECONDS = 5 + const BATCH = 10 + + console.log('\n=== Serial (no pipelining) ===') + const serialSimple = await runSerial('simple SELECT 1', SIMPLE, SECONDS) + const serialParam = await runSerial('parameterized', PARAM, SECONDS) + const serialNamed = await runSerial('named prepared', NAMED, SECONDS) + + console.log('\n=== Pipelined ===') + const pipedSimple = await benchPipelined( + 'simple SELECT 1', + (n) => Array.from({ length: n }, () => ({ text: 'SELECT 1' })), + BATCH, + SECONDS + ) + const pipedParam = await benchPipelined( + 'parameterized', + (n) => Array.from({ length: n }, () => ({ text: 'SELECT $1::int AS n', values: [42] })), + BATCH, + SECONDS + ) + const pipedNamed = await benchPipelined( + 'named prepared', + (n) => Array.from({ length: n }, (_, i) => ({ name: `bench-named-${i}`, text: 'SELECT $1::int AS n', values: [42] })), + BATCH, + SECONDS + ) + + console.log('\n=== Speedup ===') + console.log(` simple: ${(pipedSimple / serialSimple).toFixed(2)}x`) + console.log(` parameterized: ${(pipedParam / serialParam).toFixed(2)}x`) + console.log(` named: ${(pipedNamed / serialNamed).toFixed(2)}x`) +} + +run().catch((e) => { + console.error(e) + process.exit(1) +}) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 9200dded6..71404d75d 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -83,6 +83,8 @@ class Client extends EventEmitter { encoding: this.connectionParameters.client_encoding || 'utf8', }) this._queryQueue = [] + this._sentQueryQueue = [] + this.pipelining = false this.binary = c.binary || defaults.binary this.processID = null this.secretKey = null @@ -126,6 +128,9 @@ class Client extends EventEmitter { this._activeQuery = null } + this._sentQueryQueue.forEach(enqueueError) + this._sentQueryQueue.length = 0 + this._queryQueue.forEach(enqueueError) this._queryQueue.length = 0 } @@ -406,6 +411,9 @@ class Client extends EventEmitter { } this._activeQuery = null + if (activeQuery.name) { + delete this.connection.submittedNamedStatements[activeQuery.name] + } activeQuery.handleError(msg, this.connection) } @@ -476,6 +484,7 @@ class Client extends EventEmitter { // it again on the same client if (activeQuery.name) { this.connection.parsedStatements[activeQuery.name] = activeQuery.text + delete this.connection.submittedNamedStatements[activeQuery.name] } } @@ -554,6 +563,10 @@ class Client extends EventEmitter { }) } else if (client._queryQueue.indexOf(query) !== -1) { client._queryQueue.splice(client._queryQueue.indexOf(query), 1) + } else if (client._sentQueryQueue.indexOf(query) !== -1) { + // Query already sent on wire — can't remove it without corrupting the + // pipeline. No-op the callback so the result is silently discarded. + query.callback = () => {} } } @@ -577,6 +590,10 @@ class Client extends EventEmitter { } _pulseQueryQueue() { + if (this.pipelining) { + this._pulsePipelinedQueryQueue() + return + } if (this.readyForQuery === true) { this._activeQuery = this._queryQueue.shift() const activeQuery = this._getActiveQuery() @@ -599,6 +616,31 @@ class Client extends EventEmitter { } } + _pulsePipelinedQueryQueue() { + if (!this._connected || !this._queryable) { + return + } + while (this._queryQueue.length > 0) { + const query = this._queryQueue.shift() + this.hasExecuted = true + const queryError = query.submit(this.connection) + if (queryError) { + process.nextTick(() => { + query.handleError(queryError, this.connection) + }) + continue + } + this._sentQueryQueue.push(query) + } + if (this.readyForQuery && !this._activeQuery && this._sentQueryQueue.length > 0) { + this._activeQuery = this._sentQueryQueue.shift() + this.readyForQuery = false + } + if (!this._activeQuery && this._sentQueryQueue.length === 0 && this._queryQueue.length === 0 && this.hasExecuted) { + this.emit('drain') + } + } + query(config, values, callback) { // can take in strings, config object or query object let query @@ -650,10 +692,15 @@ class Client extends EventEmitter { // just do nothing if query completes query.callback = () => {} - // Remove from queue + // Remove from queue (only safe if not yet sent) const index = this._queryQueue.indexOf(query) if (index > -1) { this._queryQueue.splice(index, 1) + } else if (this.pipelining) { + // Query already sent — the pipeline is blocked until it completes. + // Destroy the connection to unblock all remaining pipelined queries. + this.connection.stream.destroy() + return } this._pulseQueryQueue() @@ -687,7 +734,7 @@ class Client extends EventEmitter { return result } - if (this._queryQueue.length > 0) { + if (this._queryQueue.length > 0 && !this.pipelining) { queryQueueLengthDeprecationNotice() } this._queryQueue.push(query) @@ -715,9 +762,15 @@ class Client extends EventEmitter { } } - if (this._getActiveQuery() || !this._queryable) { - // if we have an active query we need to force a disconnect - // on the socket - otherwise a hung query could block end forever + if (!this._queryable) { + // socket is dead — force close + this.connection.stream.destroy() + } else if (this.pipelining && (this._getActiveQuery() || this._sentQueryQueue.length > 0 || this._queryQueue.length > 0)) { + // pipelined queries are already on the wire (or queued to send) and will + // complete normally; wait for drain then do a graceful goodbye + this.once('drain', () => this.connection.end()) + } else if (this._getActiveQuery()) { + // non-pipelining: a hung query could block end forever — force disconnect this.connection.stream.destroy() } else { this.connection.end() diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 027f93935..c905c609b 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -23,6 +23,7 @@ class Connection extends EventEmitter { this._keepAlive = config.keepAlive this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis this.parsedStatements = {} + this.submittedNamedStatements = {} this.ssl = config.ssl || false this._ending = false this._emitMessage = false diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index 64aab5ff2..de71f5edb 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -153,7 +153,7 @@ class Query extends EventEmitter { if (typeof this.text !== 'string' && typeof this.name !== 'string') { return new Error('A query must have either text or a name. Supplying neither is unsupported.') } - const previous = connection.parsedStatements[this.name] + const previous = connection.parsedStatements[this.name] || connection.submittedNamedStatements[this.name] if (this.text && previous && this.text !== previous) { return new Error(`Prepared statements must be unique - '${this.name}' was used for a different statement`) } @@ -183,7 +183,7 @@ class Query extends EventEmitter { } hasBeenParsed(connection) { - return this.name && connection.parsedStatements[this.name] + return this.name && (connection.parsedStatements[this.name] || connection.submittedNamedStatements[this.name]) } handlePortalSuspended(connection) { @@ -214,6 +214,9 @@ class Query extends EventEmitter { name: this.name, types: this.types, }) + if (this.name) { + connection.submittedNamedStatements[this.name] = this.text + } } // because we're mapping user supplied values to diff --git a/packages/pg/test/integration/client/pipelining-tests.js b/packages/pg/test/integration/client/pipelining-tests.js new file mode 100644 index 000000000..d9ded61dc --- /dev/null +++ b/packages/pg/test/integration/client/pipelining-tests.js @@ -0,0 +1,152 @@ +'use strict' +const helper = require('./test-helper') +const assert = require('assert') +const suite = new helper.Suite() + +suite.test('basic pipelining with simple queries', async function () { + const client = helper.client() + client.pipelining = true + + const [r1, r2, r3] = await Promise.all([ + client.query('SELECT 1 AS num'), + client.query('SELECT 2 AS num'), + client.query('SELECT 3 AS num'), + ]) + + assert.equal(r1.rows[0].num, 1) + assert.equal(r2.rows[0].num, 2) + assert.equal(r3.rows[0].num, 3) + + await client.end() +}) + +suite.test('pipelining with parameterized queries', async function () { + const client = helper.client() + client.pipelining = true + + const [r1, r2, r3] = await Promise.all([ + client.query('SELECT $1::int AS num', [10]), + client.query('SELECT $1::text AS name', ['hello']), + client.query('SELECT $1::int + $2::int AS sum', [3, 4]), + ]) + + assert.equal(r1.rows[0].num, 10) + assert.equal(r2.rows[0].name, 'hello') + assert.equal(r3.rows[0].sum, 7) + + await client.end() +}) + +suite.test('pipelining with named prepared statements', async function () { + const client = helper.client() + client.pipelining = true + + const [r1, r2] = await Promise.all([ + client.query({ name: 'fetch-num', text: 'SELECT $1::int AS num', values: [42] }), + client.query({ name: 'fetch-num', text: 'SELECT $1::int AS num', values: [99] }), + ]) + + assert.equal(r1.rows[0].num, 42) + assert.equal(r2.rows[0].num, 99) + + await client.end() +}) + +suite.test('pipelining error isolation', async function () { + const client = helper.client() + client.pipelining = true + + const results = await Promise.allSettled([ + client.query('SELECT 1 AS num'), + client.query('SELECT INVALID SYNTAX'), + client.query('SELECT 3 AS num'), + ]) + + assert.equal(results[0].status, 'fulfilled') + assert.equal(results[0].value.rows[0].num, 1) + assert.equal(results[1].status, 'rejected') + assert.equal(results[2].status, 'fulfilled') + assert.equal(results[2].value.rows[0].num, 3) + + await client.end() +}) + +suite.test('pipelining drain event', async function () { + const client = helper.client() + client.pipelining = true + + const drainPromise = new Promise((resolve) => { + client.on('drain', resolve) + }) + + client.query('SELECT 1') + client.query('SELECT 2') + client.query('SELECT 3') + + await drainPromise + await client.end() +}) + +// #12: end() during active pipelining — should drain gracefully, not destroy +suite.test('end() waits for in-flight pipelined queries to complete', async function () { + const client = helper.client() + client.pipelining = true + + // Fire queries then call end() immediately without awaiting them + const p1 = client.query('SELECT 1 AS num') + const p2 = client.query('SELECT 2 AS num') + const endPromise = client.end() + + // All queries should resolve (not error) because end() drains gracefully + const [r1, r2] = await Promise.all([p1, p2]) + assert.equal(r1.rows[0].num, 1) + assert.equal(r2.rows[0].num, 2) + await endPromise +}) + +// #13: named statement error cleanup — submittedNamedStatements not left stale +suite.test('named statement parse error cleans up and allows re-preparation', async function () { + const client = helper.client() + client.pipelining = true + + // Use an invalid type to force a server-side parse error + const err = await client + .query({ name: 'bad-stmt', text: 'SELECT $1::nonexistent_type_xyz', values: [1] }) + .then(() => null) + .catch((e) => e) + + assert.ok(err, 'expected parse to fail') + + // The stale submittedNamedStatements entry should be gone. + // Re-using the same name with valid SQL should work. + const result = await client.query({ name: 'bad-stmt', text: 'SELECT $1::int AS n', values: [42] }) + assert.equal(result.rows[0].n, 42) + + await client.end() +}) + +// #14: query_timeout with pipelining +// When an already-sent pipelined query times out, the connection is destroyed +// to unblock the pipeline — subsequent queries error rather than hanging. +suite.test('query_timeout on sent pipelined query destroys connection to unblock', async function () { + const client = helper.client() + client.pipelining = true + client.on('error', () => {}) // absorb the 'error' event emitted when stream is destroyed + + const results = await Promise.allSettled([ + client.query('SELECT 1 AS num'), + client.query({ text: 'SELECT pg_sleep(30)', query_timeout: 100 }), + client.query('SELECT 3 AS num'), + ]) + + // Query 1 completes before the slow query enters the pipeline + assert.equal(results[0].status, 'fulfilled') + assert.equal(results[0].value.rows[0].num, 1) + + // Query 2 times out + assert.equal(results[1].status, 'rejected') + assert.ok(results[1].reason.message.includes('timeout'), `unexpected error: ${results[1].reason.message}`) + + // Query 3 errors because the connection was destroyed to unblock the pipeline + assert.equal(results[2].status, 'rejected') +}) diff --git a/packages/pg/test/unit/client/simple-query-tests.js b/packages/pg/test/unit/client/simple-query-tests.js index d7d938992..ff55bab0e 100644 --- a/packages/pg/test/unit/client/simple-query-tests.js +++ b/packages/pg/test/unit/client/simple-query-tests.js @@ -114,6 +114,120 @@ test('executing query', function () { }) }) + test('pipelining', function () { + test('sends all queries immediately after readyForQuery', function () { + const client = helper.client() + client.pipelining = true + client.connection.emit('readyForQuery') + client.query('one') + client.query('two') + client.query('three') + assert.lengthIs(client.connection.queries, 3) + assert.equal(client.connection.queries[0], 'one') + assert.equal(client.connection.queries[1], 'two') + assert.equal(client.connection.queries[2], 'three') + }) + + test('completes queries in order', function (done) { + const client = helper.client() + client.pipelining = true + const con = client.connection + con.emit('readyForQuery') + + const results = [] + client.query('one', (err, res) => { + results.push('one') + }) + client.query('two', (err, res) => { + results.push('two') + }) + client.query('three', (err, res) => { + results.push('three') + }) + + // simulate server responding to each query in order + con.emit('readyForQuery') + con.emit('readyForQuery') + con.emit('readyForQuery') + + process.nextTick(() => { + assert.deepStrictEqual(results, ['one', 'two', 'three']) + done() + }) + }) + + test('emits drain after all queries complete', function (done) { + const client = helper.client() + client.pipelining = true + const con = client.connection + con.emit('readyForQuery') + + client.query('one') + client.query('two') + + client.on('drain', () => { + done() + }) + + con.emit('readyForQuery') + con.emit('readyForQuery') + }) + + test('extended protocol: sends parse/bind/sync for each pipelined parameterized query', function () { + const client = helper.client() + client.pipelining = true + const con = client.connection + con.emit('readyForQuery') + + client.query({ text: 'SELECT $1::int', values: [1] }) + client.query({ text: 'SELECT $1::int', values: [2] }) + + // both parse messages should have been sent immediately + assert.lengthIs(con.parseMessages, 2) + assert.equal(con.parseMessages[0].text, 'SELECT $1::int') + assert.equal(con.parseMessages[1].text, 'SELECT $1::int') + // both bind messages too + assert.lengthIs(con.bindMessages, 2) + // each query sends its own sync + assert.equal(con.syncCount, 2) + }) + + test('named statement: parse sent only once when pipelining the same name', function () { + const client = helper.client() + client.pipelining = true + const con = client.connection + con.emit('readyForQuery') + + client.query({ name: 'my-stmt', text: 'SELECT $1::int', values: [1] }) + client.query({ name: 'my-stmt', text: 'SELECT $1::int', values: [2] }) + + // parse sent only once — second query reuses the submitted statement + assert.lengthIs(con.parseMessages, 1) + // both bind messages sent + assert.lengthIs(con.bindMessages, 2) + }) + + test('enabling pipelining while no queries are in flight', function () { + const client = helper.client() + const con = client.connection + con.emit('readyForQuery') + + // start non-pipelining + assert.equal(client.pipelining, false) + client.query('before') + assert.lengthIs(con.queries, 1) + + // simulate server responds + con.emit('readyForQuery') + + // now enable pipelining — should work for subsequent queries + client.pipelining = true + client.query('after-one') + client.query('after-two') + assert.lengthIs(con.queries, 3) + }) + }) + test('handles errors', function () { const client = helper.client() diff --git a/packages/pg/test/unit/client/test-helper.js b/packages/pg/test/unit/client/test-helper.js index 4a3fa9687..8c3c3304e 100644 --- a/packages/pg/test/unit/client/test-helper.js +++ b/packages/pg/test/unit/client/test-helper.js @@ -10,7 +10,22 @@ const makeClient = function (config) { connection.query = function (text) { this.queries.push(text) } + connection.parse = function (msg) { + this.parseMessages.push(msg) + } + connection.bind = function (msg) { + this.bindMessages.push(msg) + } + connection.describe = function (msg) {} + connection.execute = function (msg) {} + connection.sync = function () { + this.syncCount++ + } + connection.flush = function () {} connection.queries = [] + connection.parseMessages = [] + connection.bindMessages = [] + connection.syncCount = 0 const client = new Client({ connection: connection, ...config }) client.connect() client.connection.emit('connect')