From 99d192dbfda01f718bc9584554af2968afba0158 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 15 Dec 2020 16:02:42 +0100 Subject: [PATCH] Improve connection/protocol layers The implementation of the connection channel, the protocol and handshake are dependent in the other in cross mutual dependency situation. This cross dependency situation could leave to unexpected behaviours when something change on this layers. The solution for this issue was separate the bolt protocol with it handshake process, creation, observers and so on in a different package to hide the details and make easier to spot the unwanted dependencies. The following changes were done to achieve these goals: * [x] Extract a factory for the connection removing the create method * [x] Extract the handshake process and separate in two, the handshake message exchange and the protocol initialisation * [x] Remove the response parsing and response observers management from the connection and put this in the bolt package (`response-handler.js`) * [x] Move the message write responsibility to the protocol object * [x] Remove connection dependency from the stream-observers * [x] Fix the connection integration tests * [x] Improve the unit tests * [x] Do not touch or break test kit tests and stress tests --- src/driver.js | 9 +- src/internal/{ => bolt}/bolt-protocol-util.js | 14 +- src/internal/{ => bolt}/bolt-protocol-v1.js | 185 ++++++-- src/internal/{ => bolt}/bolt-protocol-v2.js | 4 +- src/internal/{ => bolt}/bolt-protocol-v3.js | 43 +- src/internal/{ => bolt}/bolt-protocol-v4x0.js | 34 +- src/internal/{ => bolt}/bolt-protocol-v4x1.js | 33 +- src/internal/{ => bolt}/bolt-protocol-v4x2.js | 2 +- src/internal/{ => bolt}/bolt-protocol-v4x3.js | 7 +- src/internal/bolt/create.js | 168 +++++++ src/internal/bolt/handshake.js | 133 ++++++ src/internal/bolt/index.js | 33 ++ src/internal/{ => bolt}/request-message.js | 6 +- src/internal/bolt/response-handler.js | 189 ++++++++ src/internal/{ => bolt}/routing-table-raw.js | 22 +- src/internal/{ => bolt}/stream-observers.js | 115 ++--- src/internal/connection-channel.js | 349 +++++--------- src/internal/connection-provider-direct.js | 21 +- src/internal/connection-provider-pooled.js | 24 +- src/internal/connection-provider-routing.js | 33 +- src/internal/connection.js | 3 +- src/internal/connectivity-verifier.js | 2 +- src/internal/protocol-handshaker.js | 168 ------- src/internal/rediscovery.js | 2 +- src/result.js | 2 +- src/session.js | 5 +- src/transaction.js | 2 +- .../{ => bolt}/bolt-protocol-v1.test.js | 96 ++-- .../{ => bolt}/bolt-protocol-v2.test.js | 4 +- .../{ => bolt}/bolt-protocol-v3.test.js | 63 +-- .../{ => bolt}/bolt-protocol-v4x0.test.js | 35 +- .../{ => bolt}/bolt-protocol-v4x3.test.js | 79 ++-- test/internal/bolt/index.test.js | 329 ++++++++++++++ .../{ => bolt}/request-message.test.js | 10 +- .../{ => bolt}/routing-table-raw.test.js | 22 +- .../{ => bolt}/stream-observer.test.js | 105 +++-- test/internal/connection-channel.test.js | 430 +++++++++--------- test/internal/connection-delegate.test.js | 2 +- test/internal/protocol-handshaker.test.js | 136 ------ test/internal/rediscovery.test.js | 2 +- test/internal/routing-table.test.js | 2 +- test/internal/test-utils.js | 26 +- test/session.test.js | 2 +- 43 files changed, 1790 insertions(+), 1161 deletions(-) rename src/internal/{ => bolt}/bolt-protocol-util.js (79%) rename src/internal/{ => bolt}/bolt-protocol-v1.js (65%) rename src/internal/{ => bolt}/bolt-protocol-v2.js (92%) rename src/internal/{ => bolt}/bolt-protocol-v3.js (84%) rename src/internal/{ => bolt}/bolt-protocol-v4x0.js (83%) rename src/internal/{ => bolt}/bolt-protocol-v4x1.js (67%) rename src/internal/{ => bolt}/bolt-protocol-v4x2.js (94%) rename src/internal/{ => bolt}/bolt-protocol-v4x3.js (94%) create mode 100644 src/internal/bolt/create.js create mode 100644 src/internal/bolt/handshake.js create mode 100644 src/internal/bolt/index.js rename src/internal/{ => bolt}/request-message.js (98%) create mode 100644 src/internal/bolt/response-handler.js rename src/internal/{ => bolt}/routing-table-raw.js (75%) rename src/internal/{ => bolt}/stream-observers.js (83%) delete mode 100644 src/internal/protocol-handshaker.js rename test/internal/{ => bolt}/bolt-protocol-v1.test.js (71%) rename test/internal/{ => bolt}/bolt-protocol-v2.test.js (89%) rename test/internal/{ => bolt}/bolt-protocol-v3.test.js (77%) rename test/internal/{ => bolt}/bolt-protocol-v4x0.test.js (80%) rename test/internal/{ => bolt}/bolt-protocol-v4x3.test.js (69%) create mode 100644 test/internal/bolt/index.test.js rename test/internal/{ => bolt}/request-message.test.js (96%) rename test/internal/{ => bolt}/routing-table-raw.test.js (86%) rename test/internal/{ => bolt}/stream-observer.test.js (83%) delete mode 100644 test/internal/protocol-handshaker.test.js diff --git a/src/driver.js b/src/driver.js index 657b8653c..360703319 100644 --- a/src/driver.js +++ b/src/driver.js @@ -30,7 +30,7 @@ import { } from './internal/pool-config' import Session from './session' import RxSession from './session-rx' -import { ALL } from './internal/request-message' +import { FETCH_ALL } from './internal/bolt' import { ENCRYPTION_ON, ENCRYPTION_OFF } from './internal/util' const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000 // 1 hour @@ -182,7 +182,7 @@ class Driver { * @param {string|string[]} param.bookmarks - The initial reference or references to some previous * transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown. * @param {number} param.fetchSize - The record fetch size of each batch of this session. - * Use {@link ALL} to always pull all records in one batch. This will override the config value set on driver config. + * Use {@link FETCH_ALL} to always pull all records in one batch. This will override the config value set on driver config. * @param {string} param.database - The database this session will operate on. * @return {Session} new session. */ @@ -369,12 +369,11 @@ function sanitizeIntValue (rawValue, defaultWhenAbsent) { */ function validateFetchSizeValue (rawValue, defaultWhenAbsent) { const fetchSize = parseInt(rawValue, 10) - if (fetchSize > 0 || fetchSize === ALL) { + if (fetchSize > 0 || fetchSize === FETCH_ALL) { return fetchSize } else if (fetchSize === 0 || fetchSize < 0) { throw new Error( - 'The fetch size can only be a positive value or -1 for ALL. However fetchSize = ' + - fetchSize + `The fetch size can only be a positive value or ${FETCH_ALL} for ALL. However fetchSize = ${fetchSize}` ) } else { return defaultWhenAbsent diff --git a/src/internal/bolt-protocol-util.js b/src/internal/bolt/bolt-protocol-util.js similarity index 79% rename from src/internal/bolt-protocol-util.js rename to src/internal/bolt/bolt-protocol-util.js index 314ad483c..e1d152575 100644 --- a/src/internal/bolt-protocol-util.js +++ b/src/internal/bolt/bolt-protocol-util.js @@ -16,15 +16,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { newError } from '../error' +import { newError } from '../../error' import { ResultStreamObserver } from './stream-observers' /** * @param {TxConfig} txConfig the auto-commit transaction configuration. - * @param {Connection} connection the connection. + * @param {function(error: string)} onProtocolError called when the txConfig is not empty. * @param {ResultStreamObserver} observer the response observer. */ -function assertTxConfigIsEmpty (txConfig, connection, observer) { +function assertTxConfigIsEmpty (txConfig, onProtocolError = () => {}, observer) { if (txConfig && !txConfig.isEmpty()) { const error = newError( 'Driver is connected to the database that does not support transaction configuration. ' + @@ -32,7 +32,7 @@ function assertTxConfigIsEmpty (txConfig, connection, observer) { ) // unsupported API was used, consider this a fatal error for the current connection - connection._handleFatalError(error) + onProtocolError(error.message) observer.onError(error) throw error } @@ -41,9 +41,9 @@ function assertTxConfigIsEmpty (txConfig, connection, observer) { /** * Asserts that the passed-in database name is empty. * @param {string} database - * @param {Connection} connection + * @param {fuction(err: String)} onProtocolError Called when it doesn't have database set */ -function assertDatabaseIsEmpty (database, connection, observer) { +function assertDatabaseIsEmpty (database, onProtocolError = () => {}, observer) { if (database) { const error = newError( 'Driver is connected to the database that does not support multiple databases. ' + @@ -51,7 +51,7 @@ function assertDatabaseIsEmpty (database, connection, observer) { ) // unsupported API was used, consider this a fatal error for the current connection - connection._handleFatalError(error) + onProtocolError(error.message) observer.onError(error) throw error } diff --git a/src/internal/bolt-protocol-v1.js b/src/internal/bolt/bolt-protocol-v1.js similarity index 65% rename from src/internal/bolt-protocol-v1.js rename to src/internal/bolt/bolt-protocol-v1.js index 3a4206528..d345402be 100644 --- a/src/internal/bolt-protocol-v1.js +++ b/src/internal/bolt/bolt-protocol-v1.js @@ -20,12 +20,12 @@ import { assertDatabaseIsEmpty, assertTxConfigIsEmpty } from './bolt-protocol-util' -import Bookmark from './bookmark' -import { Chunker } from './chunking' -import Connection from './connection' -import { ACCESS_MODE_WRITE, BOLT_PROTOCOL_V1 } from './constants' -import * as v1 from './packstream-v1' -import { Packer } from './packstream-v1' +import Bookmark from '../bookmark' +import { Chunker } from '../chunking' +import { ACCESS_MODE_WRITE, BOLT_PROTOCOL_V1 } from '../constants' +import Logger from '../logger' +import * as v1 from '../packstream-v1' +import { Packer } from '../packstream-v1' import RequestMessage from './request-message' import { LoginObserver, @@ -33,19 +33,43 @@ import { ResultStreamObserver, StreamObserver } from './stream-observers' -import TxConfig from './tx-config' +import TxConfig from '../tx-config' export default class BoltProtocol { + /** + * @callback CreateResponseHandler Creates the response handler + * @param {BoltProtocol} protocol The bolt protocol + * @returns {ResponseHandler} The response handler + */ + /** + * @callback OnProtocolError Handles protocol error + * @param {string} error The description + */ /** * @constructor - * @param {Connection} connection the connection. + * @param {Object} server the server informatio. * @param {Chunker} chunker the chunker. * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. + * @param {CreateResponseHandler} createResponseHandler Function which creates the response handler + * @param {Logger} log the logger + * @param {OnProtocolError} onProtocolError handles protocol errors */ - constructor (connection, chunker, disableLosslessIntegers) { - this._connection = connection + constructor ( + server, + chunker, + disableLosslessIntegers, + createResponseHandler = () => null, + log, + onProtocolError + ) { + this._server = server || {} + this._chunker = chunker this._packer = this._createPacker(chunker) this._unpacker = this._createUnpacker(disableLosslessIntegers) + this._responseHandler = createResponseHandler(this) + this._log = log + this._onProtocolError = onProtocolError + this._fatalError = null } /** @@ -91,16 +115,11 @@ export default class BoltProtocol { */ initialize ({ userAgent, authToken, onError, onComplete } = {}) { const observer = new LoginObserver({ - connection: this._connection, - afterError: onError, - afterComplete: onComplete + onError: error => this._onLoginError(error, onError), + onCompleted: metadata => this._onLoginCompleted(metadata, onComplete) }) - this._connection.write( - RequestMessage.init(userAgent, authToken), - observer, - true - ) + this.write(RequestMessage.init(userAgent, authToken), observer, true) return observer } @@ -252,7 +271,7 @@ export default class BoltProtocol { } = {} ) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeKeys, afterKeys, beforeError, @@ -262,16 +281,12 @@ export default class BoltProtocol { }) // bookmark and mode are ignored in this version of the protocol - assertTxConfigIsEmpty(txConfig, this._connection, observer) + assertTxConfigIsEmpty(txConfig, this._onProtocolError, observer) // passing in a database name on this protocol version throws an error - assertDatabaseIsEmpty(database, this._connection, observer) + assertDatabaseIsEmpty(database, this._onProtocolError, observer) - this._connection.write( - RequestMessage.run(query, parameters), - observer, - false - ) - this._connection.write(RequestMessage.pullAll(), observer, flush) + this.write(RequestMessage.run(query, parameters), observer, false) + this.write(RequestMessage.pullAll(), observer, flush) return observer } @@ -285,12 +300,12 @@ export default class BoltProtocol { */ reset ({ onError, onComplete } = {}) { const observer = new ResetObserver({ - connection: this._connection, + onProtocolError: this._onProtocolError, onError, onComplete }) - this._connection.write(RequestMessage.reset(), observer, true) + this.write(RequestMessage.reset(), observer, true) return observer } @@ -302,4 +317,116 @@ export default class BoltProtocol { _createUnpacker (disableLosslessIntegers) { return new v1.Unpacker(disableLosslessIntegers) } + + /** + * Write a message to the network channel. + * @param {RequestMessage} message the message to write. + * @param {StreamObserver} observer the response observer. + * @param {boolean} flush `true` if flush should happen after the message is written to the buffer. + */ + write (message, observer, flush) { + const queued = this.queueObserverIfProtocolIsNotBroken(observer) + + if (queued) { + if (this._log.isDebugEnabled()) { + this._log.debug(`${this} C: ${message}`) + } + + this.packer().packStruct( + message.signature, + message.fields.map(field => this.packer().packable(field)) + ) + + this._chunker.messageBoundary() + + if (flush) { + this._chunker.flush() + } + } + } + + /** + * Notifies faltal erros to the observers and mark the protocol in the fatal error state. + * @param {Error} error The error + */ + notifyFatalError (error) { + this._fatalError = error + return this._responseHandler._notifyErrorToObservers(error) + } + + /** + * Updates the the current observer with the next one on the queue. + */ + updateCurrentObserver () { + return this._responseHandler._updateCurrentObserver() + } + + /** + * Checks if exist an ongoing observable requests + * @return {boolean} + */ + hasOngoingObservableRequests () { + return this._responseHandler.hasOngoingObservableRequests() + } + + /** + * Enqueue the observer if the protocol is not broken. + * In case it's broken, the observer will be notified about the error. + * + * @param {StreamObserver} observer The observer + * @returns {boolean} if it was queued + */ + queueObserverIfProtocolIsNotBroken (observer) { + if (this.isBroken()) { + this.notifyFatalErrorToObserver(observer) + return false + } + + return this._responseHandler._queueObserver(observer) + } + + /** + * Veritfy the protocol is not broken. + * @returns {boolean} + */ + isBroken () { + return !!this._fatalError + } + + /** + * Notifies the current fatal error to the observer + * + * @param {StreamObserver} observer The observer + */ + notifyFatalErrorToObserver (observer) { + if (observer && observer.onError) { + observer.onError(this._fatalError) + } + } + + /** + * Reset current failure on the observable response handler to null. + */ + resetFailure () { + this._responseHandler._resetFailure() + } + + _onLoginCompleted (metadata, onCompleted) { + if (metadata) { + const serverVersion = metadata.server + if (!this._server.version) { + this._server.version = serverVersion + } + } + if (onCompleted) { + onCompleted(metadata) + } + } + + _onLoginError (error, onError) { + this._onProtocolError(error.message) + if (onError) { + onError(error) + } + } } diff --git a/src/internal/bolt-protocol-v2.js b/src/internal/bolt/bolt-protocol-v2.js similarity index 92% rename from src/internal/bolt-protocol-v2.js rename to src/internal/bolt/bolt-protocol-v2.js index fabb602ad..533360702 100644 --- a/src/internal/bolt-protocol-v2.js +++ b/src/internal/bolt/bolt-protocol-v2.js @@ -17,8 +17,8 @@ * limitations under the License. */ import BoltProtocolV1 from './bolt-protocol-v1' -import * as v2 from './packstream-v2' -import { BOLT_PROTOCOL_V2 } from './constants' +import * as v2 from '../packstream-v2' +import { BOLT_PROTOCOL_V2 } from '../constants' export default class BoltProtocol extends BoltProtocolV1 { _createPacker (chunker) { diff --git a/src/internal/bolt-protocol-v3.js b/src/internal/bolt/bolt-protocol-v3.js similarity index 84% rename from src/internal/bolt-protocol-v3.js rename to src/internal/bolt/bolt-protocol-v3.js index 3a28346e2..2b4604001 100644 --- a/src/internal/bolt-protocol-v3.js +++ b/src/internal/bolt/bolt-protocol-v3.js @@ -25,9 +25,9 @@ import { ResultStreamObserver, ProcedureRouteObserver } from './stream-observers' -import { BOLT_PROTOCOL_V3 } from './constants' -import Bookmark from './bookmark' -import TxConfig from './tx-config' +import { BOLT_PROTOCOL_V3 } from '../constants' +import Bookmark from '../bookmark' +import TxConfig from '../tx-config' const CONTEXT = 'context' const CALL_GET_ROUTING_TABLE = `CALL dbms.cluster.routing.getRoutingTable($${CONTEXT})` @@ -56,22 +56,17 @@ export default class BoltProtocol extends BoltProtocolV2 { initialize ({ userAgent, authToken, onError, onComplete } = {}) { const observer = new LoginObserver({ - connection: this._connection, - afterError: onError, - afterComplete: onComplete + onError: error => this._onLoginError(error, onError), + onCompleted: metadata => this._onLoginCompleted(metadata, onComplete) }) - this._connection.write( - RequestMessage.hello(userAgent, authToken), - observer, - true - ) + this.write(RequestMessage.hello(userAgent, authToken), observer, true) return observer } prepareToClose () { - this._connection.write(RequestMessage.goodbye(), noOpObserver, true) + this.write(RequestMessage.goodbye(), noOpObserver, true) } beginTransaction ({ @@ -85,7 +80,7 @@ export default class BoltProtocol extends BoltProtocolV2 { afterComplete } = {}) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeError, afterError, beforeComplete, @@ -94,9 +89,9 @@ export default class BoltProtocol extends BoltProtocolV2 { observer.prepareToHandleSingleResponse() // passing in a database name on this protocol version throws an error - assertDatabaseIsEmpty(database, this._connection, observer) + assertDatabaseIsEmpty(database, this._onProtocolError, observer) - this._connection.write( + this.write( RequestMessage.begin({ bookmark, txConfig, mode }), observer, true @@ -112,7 +107,7 @@ export default class BoltProtocol extends BoltProtocolV2 { afterComplete } = {}) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeError, afterError, beforeComplete, @@ -120,7 +115,7 @@ export default class BoltProtocol extends BoltProtocolV2 { }) observer.prepareToHandleSingleResponse() - this._connection.write(RequestMessage.commit(), observer, true) + this.write(RequestMessage.commit(), observer, true) return observer } @@ -132,7 +127,7 @@ export default class BoltProtocol extends BoltProtocolV2 { afterComplete } = {}) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeError, afterError, beforeComplete, @@ -140,7 +135,7 @@ export default class BoltProtocol extends BoltProtocolV2 { }) observer.prepareToHandleSingleResponse() - this._connection.write(RequestMessage.rollback(), observer, true) + this.write(RequestMessage.rollback(), observer, true) return observer } @@ -163,7 +158,7 @@ export default class BoltProtocol extends BoltProtocolV2 { } = {} ) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeKeys, afterKeys, beforeError, @@ -173,9 +168,9 @@ export default class BoltProtocol extends BoltProtocolV2 { }) // passing in a database name on this protocol version throws an error - assertDatabaseIsEmpty(database, this._connection, observer) + assertDatabaseIsEmpty(database, this._onProtocolError, observer) - this._connection.write( + this.write( RequestMessage.runWithMetadata(query, parameters, { bookmark, txConfig, @@ -184,7 +179,7 @@ export default class BoltProtocol extends BoltProtocolV2 { observer, false ) - this._connection.write(RequestMessage.pullAll(), observer, flush) + this.write(RequestMessage.pullAll(), observer, flush) return observer } @@ -218,7 +213,7 @@ export default class BoltProtocol extends BoltProtocolV2 { return new ProcedureRouteObserver({ resultObserver, - connection: this._connection, + onProtocolError: this._onProtocolError, onError, onCompleted }) diff --git a/src/internal/bolt-protocol-v4x0.js b/src/internal/bolt/bolt-protocol-v4x0.js similarity index 83% rename from src/internal/bolt-protocol-v4x0.js rename to src/internal/bolt/bolt-protocol-v4x0.js index 31ebddd22..cad0642d3 100644 --- a/src/internal/bolt-protocol-v4x0.js +++ b/src/internal/bolt/bolt-protocol-v4x0.js @@ -22,9 +22,9 @@ import { ResultStreamObserver, ProcedureRouteObserver } from './stream-observers' -import { BOLT_PROTOCOL_V4_0 } from './constants' -import Bookmark from './bookmark' -import TxConfig from './tx-config' +import { BOLT_PROTOCOL_V4_0 } from '../constants' +import Bookmark from '../bookmark' +import TxConfig from '../tx-config' const CONTEXT = 'context' const DATABASE = 'database' @@ -46,7 +46,7 @@ export default class BoltProtocol extends BoltProtocolV3 { afterComplete } = {}) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeError, afterError, beforeComplete, @@ -54,7 +54,7 @@ export default class BoltProtocol extends BoltProtocolV3 { }) observer.prepareToHandleSingleResponse() - this._connection.write( + this.write( RequestMessage.begin({ bookmark, txConfig, database, mode }), observer, true @@ -83,11 +83,11 @@ export default class BoltProtocol extends BoltProtocolV3 { } = {} ) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, reactive: reactive, fetchSize: fetchSize, - moreFunction: this._requestMore, - discardFunction: this._requestDiscard, + moreFunction: this._requestMore.bind(this), + discardFunction: this._requestDiscard.bind(this), beforeKeys, afterKeys, beforeError, @@ -97,7 +97,7 @@ export default class BoltProtocol extends BoltProtocolV3 { }) const flushRun = reactive - this._connection.write( + this.write( RequestMessage.runWithMetadata(query, parameters, { bookmark, txConfig, @@ -109,22 +109,18 @@ export default class BoltProtocol extends BoltProtocolV3 { ) if (!reactive) { - this._connection.write( - RequestMessage.pull({ n: fetchSize }), - observer, - flush - ) + this.write(RequestMessage.pull({ n: fetchSize }), observer, flush) } return observer } - _requestMore (connection, stmtId, n, observer) { - connection.write(RequestMessage.pull({ stmtId, n }), observer, true) + _requestMore (stmtId, n, observer) { + this.write(RequestMessage.pull({ stmtId, n }), observer, true) } - _requestDiscard (connection, stmtId, observer) { - connection.write(RequestMessage.discard({ stmtId }), observer, true) + _requestDiscard (stmtId, observer) { + this.write(RequestMessage.discard({ stmtId }), observer, true) } _noOp () {} @@ -163,7 +159,7 @@ export default class BoltProtocol extends BoltProtocolV3 { return new ProcedureRouteObserver({ resultObserver, - connection: this._connection, + onProtocolError: this._onProtocolError, onError, onCompleted }) diff --git a/src/internal/bolt-protocol-v4x1.js b/src/internal/bolt/bolt-protocol-v4x1.js similarity index 67% rename from src/internal/bolt-protocol-v4x1.js rename to src/internal/bolt/bolt-protocol-v4x1.js index 3092bf572..4ca7d322b 100644 --- a/src/internal/bolt-protocol-v4x1.js +++ b/src/internal/bolt/bolt-protocol-v4x1.js @@ -18,19 +18,37 @@ */ import BoltProtocolV4 from './bolt-protocol-v4x0' import RequestMessage, { ALL } from './request-message' -import { BOLT_PROTOCOL_V4_1 } from './constants' +import { BOLT_PROTOCOL_V4_1 } from '../constants' import { LoginObserver } from './stream-observers' export default class BoltProtocol extends BoltProtocolV4 { /** * @constructor - * @param {Connection} connection the connection. + * @param {Object} server the server informatio. * @param {Chunker} chunker the chunker. * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. + * @param {CreateResponseHandler} createResponseHandler Function which creates the response handler + * @param {Logger} log the logger * @param {Object} serversideRouting + * */ - constructor (connection, chunker, disableLosslessIntegers, serversideRouting) { - super(connection, chunker, disableLosslessIntegers) + constructor ( + server, + chunker, + disableLosslessIntegers, + createResponseHandler = () => null, + log, + onProtocolError, + serversideRouting + ) { + super( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError + ) this._serversideRouting = serversideRouting } @@ -40,12 +58,11 @@ export default class BoltProtocol extends BoltProtocolV4 { initialize ({ userAgent, authToken, onError, onComplete } = {}) { const observer = new LoginObserver({ - connection: this._connection, - afterError: onError, - afterComplete: onComplete + onError: error => this._onLoginError(error, onError), + onCompleted: metadata => this._onLoginCompleted(metadata, onComplete) }) - this._connection.write( + this.write( RequestMessage.hello(userAgent, authToken, this._serversideRouting), observer, true diff --git a/src/internal/bolt-protocol-v4x2.js b/src/internal/bolt/bolt-protocol-v4x2.js similarity index 94% rename from src/internal/bolt-protocol-v4x2.js rename to src/internal/bolt/bolt-protocol-v4x2.js index e24f3360e..5d4279ccc 100644 --- a/src/internal/bolt-protocol-v4x2.js +++ b/src/internal/bolt/bolt-protocol-v4x2.js @@ -17,7 +17,7 @@ * limitations under the License. */ import BoltProtocolV41 from './bolt-protocol-v4x1' -import { BOLT_PROTOCOL_V4_2 } from './constants' +import { BOLT_PROTOCOL_V4_2 } from '../constants' export default class BoltProtocol extends BoltProtocolV41 { get version () { diff --git a/src/internal/bolt-protocol-v4x3.js b/src/internal/bolt/bolt-protocol-v4x3.js similarity index 94% rename from src/internal/bolt-protocol-v4x3.js rename to src/internal/bolt/bolt-protocol-v4x3.js index 18538638a..4746e7864 100644 --- a/src/internal/bolt-protocol-v4x3.js +++ b/src/internal/bolt/bolt-protocol-v4x3.js @@ -17,7 +17,7 @@ * limitations under the License. */ import BoltProtocolV42 from './bolt-protocol-v4x2' -import { BOLT_PROTOCOL_V4_3 } from './constants' +import { BOLT_PROTOCOL_V4_3 } from '../constants' import RequestMessage from './request-message' import { RouteObserver } from './stream-observers' @@ -37,7 +37,6 @@ export default class BoltProtocol extends BoltProtocolV42 { * @param {function(RawRoutingTable)} param.onCompleted * @returns {RouteObserver} the route observer */ - requestRoutingInformation ({ routingContext = {}, databaseName = null, @@ -46,12 +45,12 @@ export default class BoltProtocol extends BoltProtocolV42 { onCompleted }) { const observer = new RouteObserver({ - connection: this._connection, + onProtocolError: this._onProtocolError, onError, onCompleted }) - this._connection.write( + this.write( RequestMessage.route( { ...routingContext, address: initialAddress }, databaseName diff --git a/src/internal/bolt/create.js b/src/internal/bolt/create.js new file mode 100644 index 000000000..d930ca249 --- /dev/null +++ b/src/internal/bolt/create.js @@ -0,0 +1,168 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { newError } from '../../error' +import BoltProtocolV1 from './bolt-protocol-v1' +import BoltProtocolV2 from './bolt-protocol-v2' +import BoltProtocolV3 from './bolt-protocol-v3' +import BoltProtocolV4x0 from './bolt-protocol-v4x0' +import BoltProtocolV4x1 from './bolt-protocol-v4x1' +import BoltProtocolV4x2 from './bolt-protocol-v4x2' +import BoltProtocolV4x3 from './bolt-protocol-v4x3' +import { Chunker, Dechunker } from '../chunking' +import ResponseHandler from './response-handler' + +/** + * Creates a protocol with a given version + * + * @param {object} config + * @param {number} config.version The version of the protocol + * @param {channel} config.channel The channel + * @param {Chunker} config.chunker The chunker + * @param {Dechunker} config.dechunker The dechunker + * @param {Logger} config.log The logger + * @param {ResponseHandler~Observer} config.observer Observer + * @param {boolean} config.disableLosslessIntegers Disable the lossless integers + * @param {boolean} config.serversideRouting It's using server side routing + */ +export default function create ({ + version, + chunker, + dechunker, + channel, + disableLosslessIntegers, + serversideRouting, + server, // server info + log, + observer +} = {}) { + const createResponseHandler = protocol => { + const responseHandler = new ResponseHandler({ + transformMetadata: protocol.transformMetadata.bind(protocol), + log, + observer + }) + + // reset the error handler to just handle errors and forget about the handshake promise + channel.onerror = observer.onError.bind(observer) + + // Ok, protocol running. Simply forward all messages to the dechunker + channel.onmessage = buf => dechunker.write(buf) + + // setup dechunker to dechunk messages and forward them to the message handler + dechunker.onmessage = buf => { + responseHandler.handleResponse(protocol.unpacker().unpack(buf)) + } + + return responseHandler + } + + return createProtocol( + version, + server, + chunker, + disableLosslessIntegers, + serversideRouting, + createResponseHandler, + observer.onProtocolError.bind(observer), + log + ) +} + +function createProtocol ( + version, + server, + chunker, + disableLosslessIntegers, + serversideRouting, + createResponseHandler, + onProtocolError, + log +) { + switch (version) { + case 1: + return new BoltProtocolV1( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError + ) + case 2: + return new BoltProtocolV2( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError + ) + case 3: + return new BoltProtocolV3( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError + ) + case 4.0: + return new BoltProtocolV4x0( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError + ) + case 4.1: + return new BoltProtocolV4x1( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError, + serversideRouting + ) + case 4.2: + return new BoltProtocolV4x2( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError, + serversideRouting + ) + case 4.3: + return new BoltProtocolV4x3( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError, + serversideRouting + ) + default: + throw newError('Unknown Bolt protocol version: ' + version) + } +} diff --git a/src/internal/bolt/handshake.js b/src/internal/bolt/handshake.js new file mode 100644 index 000000000..61727d665 --- /dev/null +++ b/src/internal/bolt/handshake.js @@ -0,0 +1,133 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { alloc } from '../node' +import { newError } from '../../error' + +const BOLT_MAGIC_PREAMBLE = 0x6060b017 + +function version (major, minor) { + return { + major, + minor + } +} + +function createHandshakeMessage (versions) { + if (versions.length > 4) { + throw newError('It should not have more than 4 versions of the protocol') + } + const handshakeBuffer = alloc(5 * 4) + + handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE) + + versions.forEach(version => { + if (version instanceof Array) { + const { major, minor } = version[0] + const { minor: minMinor } = version[1] + const range = minor - minMinor + handshakeBuffer.writeInt32((range << 16) | (minor << 8) | major) + } else { + const { major, minor } = version + handshakeBuffer.writeInt32((minor << 8) | major) + } + }) + + handshakeBuffer.reset() + + return handshakeBuffer +} + +function parseNegotiatedResponse (buffer) { + const h = [ + buffer.readUInt8(), + buffer.readUInt8(), + buffer.readUInt8(), + buffer.readUInt8() + ] + if (h[0] === 0x48 && h[1] === 0x54 && h[2] === 0x54 && h[3] === 0x50) { + throw newError( + 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' + ) + } + return Number(h[3] + '.' + h[2]) +} + +/** + * @return {BaseBuffer} + * @private + */ +function newHandshakeBuffer () { + return createHandshakeMessage([ + [version(4, 3), version(4, 2)], + version(4, 1), + version(4, 0), + version(3, 0) + ]) +} + +/** + * This callback is displayed as a global member. + * @callback BufferConsumerCallback + * @param {buffer} buffer the remaining buffer + */ +/** + * @typedef HandshakeResult + * @property {number} protocolVersion The protocol version negotiated in the handshake + * @property {function(BufferConsumerCallback)} consumeRemainingBuffer A function to consume the remaining buffer if it exists + */ +/** + * Shake hands using the channel and return the protocol version + * + * @param {Channel} channel the channel use to shake hands + * @returns {Promise} Promise of protocol version and consumeRemainingBuffer + */ +export default function handshake (channel) { + return new Promise((resolve, reject) => { + const handshakeErrorHandler = error => { + reject(error) + } + + channel.onerror = handshakeErrorHandler.bind(this) + if (channel._error) { + handshakeErrorHandler(channel._error) + } + + channel.onmessage = buffer => { + try { + // read the response buffer and initialize the protocol + const protocolVersion = parseNegotiatedResponse(buffer) + + resolve({ + protocolVersion, + consumeRemainingBuffer: consumer => { + if (buffer.hasRemaining()) { + consumer(buffer.readSlice(buffer.remaining())) + } + } + }) + } catch (e) { + reject(e) + } + } + + channel.write(newHandshakeBuffer()) + }) +} diff --git a/src/internal/bolt/index.js b/src/internal/bolt/index.js new file mode 100644 index 000000000..354a05099 --- /dev/null +++ b/src/internal/bolt/index.js @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import handshake from './handshake' +import create from './create' +import _BoltProtocol from './bolt-protocol-v4x3' +import _RawRoutingTable from './routing-table-raw' + +export * from './stream-observers' +export { ALL as FETCH_ALL } from './request-message' + +export const BoltProtocol = _BoltProtocol +export const RawRoutingTable = _RawRoutingTable + +export default { + handshake, + create +} diff --git a/src/internal/request-message.js b/src/internal/bolt/request-message.js similarity index 98% rename from src/internal/request-message.js rename to src/internal/bolt/request-message.js index 10098f32c..b36d3bddb 100644 --- a/src/internal/request-message.js +++ b/src/internal/bolt/request-message.js @@ -17,9 +17,9 @@ * limitations under the License. */ -import { ACCESS_MODE_READ } from './constants' -import { int } from '../integer' -import { assertString } from './util' +import { ACCESS_MODE_READ } from '../constants' +import { int } from '../../integer' +import { assertString } from '../util' /* eslint-disable no-unused-vars */ // Signature bytes for each request message type diff --git a/src/internal/bolt/response-handler.js b/src/internal/bolt/response-handler.js new file mode 100644 index 000000000..f02623b5b --- /dev/null +++ b/src/internal/bolt/response-handler.js @@ -0,0 +1,189 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { newError } from '../../error' + +// Signature bytes for each response message type +const SUCCESS = 0x70 // 0111 0000 // SUCCESS +const RECORD = 0x71 // 0111 0001 // RECORD +const IGNORED = 0x7e // 0111 1110 // IGNORED +const FAILURE = 0x7f // 0111 1111 // FAILURE + +function NO_OP () {} + +function NO_OP_IDENTITY (subject) { + return subject +} + +const NO_OP_OBSERVER = { + onNext: NO_OP, + onCompleted: NO_OP, + onError: NO_OP +} + +/** + * Treat the protocol responses and notify the observers + */ +export default class ResponseHandler { + /** + * Called when something went wrong with the connectio + * @callback ResponseHandler~Observer~OnErrorApplyTransformation + * @param {any} error The error + * @returns {any} The new error + */ + /** + * Called when something went wrong with the connectio + * @callback ResponseHandler~Observer~OnError + * @param {any} error The error + */ + /** + * Called when something went wrong with the connectio + * @callback ResponseHandler~MetadataTransformer + * @param {any} metadata The metadata got onSuccess + * @returns {any} The transformed metadata + */ + /** + * @typedef {Object} ResponseHandler~Observer + * @property {ResponseHandler~Observer~OnError} onError Invoke when a connection error occurs + * @property {ResponseHandler~Observer~OnError} onFailure Invoke when a protocol failure occurs + * @property {ResponseHandler~Observer~OnErrorApplyTransformation} onErrorApplyTransformation Invoke just after the failure occurs, + * before notify to respective observer. This method should transform the failure reason to the approprited one. + */ + /** + * Constructor + * @param {Object} param The params + * @param {ResponseHandler~MetadataTransformer} transformMetadata Transform metadata when the SUCCESS is received. + * @param {Channel} channel The channel used to exchange messages + * @param {Logger} log The logger + * @param {ResponseHandler~Observer} observer Object which will be notified about errors + */ + constructor ({ transformMetadata, log, observer } = {}) { + this._pendingObservers = [] + this._log = log + this._transformMetadata = transformMetadata || NO_OP_IDENTITY + this._observer = Object.assign( + { + onError: NO_OP, + onFailure: NO_OP, + onErrorApplyTransformation: NO_OP_IDENTITY + }, + observer + ) + } + + handleResponse (msg) { + const payload = msg.fields[0] + + switch (msg.signature) { + case RECORD: + if (this._log.isDebugEnabled()) { + this._log.debug(`${this} S: RECORD ${JSON.stringify(msg)}`) + } + this._currentObserver.onNext(payload) + break + case SUCCESS: + if (this._log.isDebugEnabled()) { + this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`) + } + try { + const metadata = this._transformMetadata(payload) + this._currentObserver.onCompleted(metadata) + } finally { + this._updateCurrentObserver() + } + break + case FAILURE: + if (this._log.isDebugEnabled()) { + this._log.debug(`${this} S: FAILURE ${JSON.stringify(msg)}`) + } + try { + const error = newError(payload.message, payload.code) + this._currentFailure = this._observer.onErrorApplyTransformation( + error + ) + this._currentObserver.onError(this._currentFailure) + } finally { + this._updateCurrentObserver() + // Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure. + this._observer.onFailure(this._currentFailure) + } + break + case IGNORED: + if (this._log.isDebugEnabled()) { + this._log.debug(`${this} S: IGNORED ${JSON.stringify(msg)}`) + } + try { + if (this._currentFailure && this._currentObserver.onError) { + this._currentObserver.onError(this._currentFailure) + } else if (this._currentObserver.onError) { + this._currentObserver.onError( + newError('Ignored either because of an error or RESET') + ) + } + } finally { + this._updateCurrentObserver() + } + break + default: + this._observer.onError( + newError('Unknown Bolt protocol message: ' + msg) + ) + } + } + + /* + * Pop next pending observer form the list of observers and make it current observer. + * @protected + */ + _updateCurrentObserver () { + this._currentObserver = this._pendingObservers.shift() + } + + _queueObserver (observer) { + observer = observer || NO_OP_OBSERVER + observer.onCompleted = observer.onCompleted || NO_OP + observer.onError = observer.onError || NO_OP + observer.onNext = observer.onNext || NO_OP + if (this._currentObserver === undefined) { + this._currentObserver = observer + } else { + this._pendingObservers.push(observer) + } + return true + } + + _notifyErrorToObservers (error) { + if (this._currentObserver && this._currentObserver.onError) { + this._currentObserver.onError(error) + } + while (this._pendingObservers.length > 0) { + const observer = this._pendingObservers.shift() + if (observer && observer.onError) { + observer.onError(error) + } + } + } + + hasOngoingObservableRequests () { + return this._currentObserver != null || this._pendingObservers.length > 0 + } + + _resetFailure () { + this._currentFailure = null + } +} diff --git a/src/internal/routing-table-raw.js b/src/internal/bolt/routing-table-raw.js similarity index 75% rename from src/internal/routing-table-raw.js rename to src/internal/bolt/routing-table-raw.js index ea5854c42..498d192c3 100644 --- a/src/internal/routing-table-raw.js +++ b/src/internal/bolt/routing-table-raw.js @@ -1,4 +1,22 @@ -import Record from '../record' +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Record from '../../record' /** * Represente the raw version of the routing table @@ -6,7 +24,7 @@ import Record from '../record' export default class RawRoutingTable { /** * Constructs the raw routing table for Record based result - * @param {record} record The record which will be used get the raw routing table + * @param {Record} record The record which will be used get the raw routing table * @returns {RawRoutingTable} The raw routing table */ static ofRecord (record) { diff --git a/src/internal/stream-observers.js b/src/internal/bolt/stream-observers.js similarity index 83% rename from src/internal/stream-observers.js rename to src/internal/bolt/stream-observers.js index b54e318a8..956bc47ee 100644 --- a/src/internal/stream-observers.js +++ b/src/internal/bolt/stream-observers.js @@ -16,10 +16,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import Record from '../record' -import Connection from './connection' -import { newError, PROTOCOL_ERROR } from '../error' -import Integer from '../integer' +import Record from '../../record' +import { newError, PROTOCOL_ERROR } from '../../error' +import Integer from '../../integer' import { ALL } from './request-message' import RawRoutingTable from './routing-table-raw' @@ -45,10 +44,10 @@ class ResultStreamObserver extends StreamObserver { /** * * @param {Object} param - * @param {Connection} param.connection + * @param {Object} param.server * @param {boolean} param.reactive - * @param {function(connection: Connection, stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction - - * @param {function(connection: Connection, stmtId: number|Integer, observer: StreamObserver)} param.discardFunction - + * @param {function(stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction - + * @param {function(stmtId: number|Integer, observer: StreamObserver)} param.discardFunction - * @param {number|Integer} param.fetchSize - * @param {function(err: Error): Promise|void} param.beforeError - * @param {function(err: Error): Promise|void} param.afterError - @@ -58,7 +57,6 @@ class ResultStreamObserver extends StreamObserver { * @param {function(metadata: Object): Promise|void} param.afterComplete - */ constructor ({ - connection, reactive = false, moreFunction, discardFunction, @@ -68,12 +66,11 @@ class ResultStreamObserver extends StreamObserver { beforeKeys, afterKeys, beforeComplete, - afterComplete + afterComplete, + server } = {}) { super() - this._connection = connection - this._fieldKeys = null this._fieldLookup = null this._head = null @@ -82,6 +79,7 @@ class ResultStreamObserver extends StreamObserver { this._error = null this._observers = [] this._meta = {} + this._server = server this._beforeError = beforeError this._afterError = afterError @@ -217,7 +215,7 @@ class ResultStreamObserver extends StreamObserver { _handlePullSuccess (meta) { this._setState(_states.SUCCEEDED) const completionMetadata = Object.assign( - this._connection ? { server: this._connection.server } : {}, + this._server ? { server: this._server } : {}, this._meta, meta ) @@ -344,15 +342,10 @@ class ResultStreamObserver extends StreamObserver { _handleStreaming () { if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) { if (this._discard) { - this._discardFunction(this._connection, this._queryId, this) + this._discardFunction(this._queryId, this) this._setState(_states.STREAMING) } else if (this._autoPull) { - this._moreFunction( - this._connection, - this._queryId, - this._fetchSize, - this - ) + this._moreFunction(this._queryId, this._fetchSize, this) this._setState(_states.STREAMING) } } @@ -389,26 +382,13 @@ class LoginObserver extends StreamObserver { /** * * @param {Object} param - - * @param {Connection} param.connection - * @param {function(err: Error)} param.beforeError - * @param {function(err: Error)} param.afterError - * @param {function(metadata)} param.beforeComplete - * @param {function(metadata)} param.afterComplete + * @param {function(err: Error)} param.onError + * @param {function(metadata)} param.onCompleted */ - constructor ({ - connection, - beforeError, - afterError, - beforeComplete, - afterComplete - } = {}) { + constructor ({ onError, onCompleted } = {}) { super() - - this._connection = connection - this._beforeError = beforeError - this._afterError = afterError - this._beforeComplete = beforeComplete - this._afterComplete = afterComplete + this._onError = onError + this._onCompleted = onCompleted } onNext (record) { @@ -418,39 +398,14 @@ class LoginObserver extends StreamObserver { } onError (error) { - if (this._beforeError) { - this._beforeError(error) - } - - this._connection._updateCurrentObserver() // make sure this exact observer will not be called again - this._connection._handleFatalError(error) // initialization errors are fatal - - if (this._afterError) { - this._afterError(error) + if (this._onError) { + this._onError(error) } } onCompleted (metadata) { - if (this._beforeComplete) { - this._beforeComplete(metadata) - } - - if (metadata) { - // read server version from the response metadata, if it is available - const serverVersion = metadata.server - if (!this._connection.version) { - this._connection.version = serverVersion - } - - // read database connection id from the response metadata, if it is available - const dbConnectionId = metadata.connection_id - if (!this._connection.databaseId) { - this._connection.databaseId = dbConnectionId - } - } - - if (this._afterComplete) { - this._afterComplete(metadata) + if (this._onCompleted) { + this._onCompleted(metadata) } } } @@ -459,14 +414,14 @@ class ResetObserver extends StreamObserver { /** * * @param {Object} param - - * @param {Connection} param.connection + * @param {function(err: String)} param.onProtocolError * @param {function(err: Error)} param.onError * @param {function(metadata)} param.onComplete */ - constructor ({ connection, onError, onComplete } = {}) { + constructor ({ onProtocolError, onError, onComplete } = {}) { super() - this._connection = connection + this._onProtocolError = onProtocolError this._onError = onError this._onComplete = onComplete } @@ -482,8 +437,8 @@ class ResetObserver extends StreamObserver { } onError (error) { - if (error.code === PROTOCOL_ERROR) { - this._connection._handleProtocolError(error.message) + if (error.code === PROTOCOL_ERROR && this._onProtocolError) { + this._onProtocolError(error.message) } if (this._onError) { @@ -514,14 +469,14 @@ class CompletedObserver extends ResultStreamObserver { } class ProcedureRouteObserver extends StreamObserver { - constructor ({ resultObserver, connection, onError, onCompleted }) { + constructor ({ resultObserver, onProtocolError, onError, onCompleted }) { super() this._resultObserver = resultObserver this._onError = onError this._onCompleted = onCompleted - this._connection = connection this._records = [] + this._onProtocolError = onProtocolError resultObserver.subscribe(this) } @@ -530,8 +485,8 @@ class ProcedureRouteObserver extends StreamObserver { } onError (error) { - if (error.code === PROTOCOL_ERROR) { - this._connection._handleProtocolError(error.message) + if (error.code === PROTOCOL_ERROR && this._onProtocolError) { + this._onProtocolError(error.message) } if (this._onError) { @@ -563,14 +518,14 @@ class RouteObserver extends StreamObserver { /** * * @param {Object} param - - * @param {Connection} param.connection + * @param {function(err: String)} param.onProtocolError * @param {function(err: Error)} param.onError * @param {function(RawRoutingTable)} param.onCompleted */ - constructor ({ connection, onError, onCompleted } = {}) { + constructor ({ onProtocolError, onError, onCompleted } = {}) { super() - this._connection = connection + this._onProtocolError = onProtocolError this._onError = onError this._onCompleted = onCompleted } @@ -586,8 +541,8 @@ class RouteObserver extends StreamObserver { } onError (error) { - if (error.code === PROTOCOL_ERROR) { - this._connection._handleProtocolError(error.message) + if (error.code === PROTOCOL_ERROR && this._onProtocolError) { + this._onProtocolError(error.message) } if (this._onError) { diff --git a/src/internal/connection-channel.js b/src/internal/connection-channel.js index 909c0666a..d3e8ffd62 100644 --- a/src/internal/connection-channel.js +++ b/src/internal/connection-channel.js @@ -21,26 +21,81 @@ import { Channel } from './node' import { Chunker, Dechunker } from './chunking' import { newError, PROTOCOL_ERROR } from '../error' import ChannelConfig from './channel-config' -import ProtocolHandshaker from './protocol-handshaker' import Connection from './connection' -import BoltProtocol from './bolt-protocol-v1' -import { ResultStreamObserver } from './stream-observers' +import Bolt from './bolt' -// Signature bytes for each response message type -const SUCCESS = 0x70 // 0111 0000 // SUCCESS -const RECORD = 0x71 // 0111 0001 // RECORD -const IGNORED = 0x7e // 0111 1110 // IGNORED -const FAILURE = 0x7f // 0111 1111 // FAILURE +let idGenerator = 0 -function NO_OP () {} +/** + * Crete new connection to the provided address. Returned connection is not connected. + * @param {ServerAddress} address - the Bolt endpoint to connect to. + * @param {Object} config - the driver configuration. + * @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors. + * @param {Logger} log - configured logger. + * @return {Connection} - new connection. + */ +export function createChannelConnection ( + address, + config, + errorHandler, + log, + serversideRouting = null, + createChannel = channelConfig => new Channel(channelConfig) +) { + const channelConfig = new ChannelConfig( + address, + config, + errorHandler.errorCode() + ) + + const channel = createChannel(channelConfig) + + return Bolt.handshake(channel) + .then(({ protocolVersion: version, consumeRemainingBuffer }) => { + const chunker = new Chunker(channel) + const dechunker = new Dechunker() + const createProtocol = conn => + Bolt.create({ + version, + connection: conn, + channel, + chunker, + dechunker, + disableLosslessIntegers: config.disableLosslessIntegers, + serversideRouting, + server: conn.server, + log, + observer: { + onError: conn._handleFatalError.bind(conn), + onFailure: conn._resetOnFailure.bind(conn), + onProtocolError: conn._handleProtocolError.bind(conn), + onErrorApplyTransformation: error => + conn.handleAndTransformError(error, conn._address) + } + }) + + const connection = new ChannelConnection( + channel, + errorHandler, + address, + log, + config.disableLosslessIntegers, + serversideRouting, + chunker, + createProtocol + ) -const NO_OP_OBSERVER = { - onNext: NO_OP, - onCompleted: NO_OP, - onError: NO_OP -} + // forward all pending bytes to the dechunker + consumeRemainingBuffer(buffer => dechunker.write(buffer)) -let idGenerator = 0 + return connection + }) + .catch(reason => + channel.close().then(() => { + throw reason + }) + ) +} export default class ChannelConnection extends Connection { /** @@ -50,6 +105,8 @@ export default class ChannelConnection extends Connection { * @param {ServerAddress} address - the server address to connect to. * @param {Logger} log - the configured logger. * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. + * @param {Chunker} chunker the chunker + * @param protocolSupplier Bolt protocol supplier */ constructor ( channel, @@ -57,7 +114,9 @@ export default class ChannelConnection extends Connection { address, log, disableLosslessIntegers = false, - serversideRouting = null + serversideRouting = null, + chunker, // to be removed, + protocolSupplier ) { super(errorHandler) @@ -66,11 +125,8 @@ export default class ChannelConnection extends Connection { this._server = { address: address.asHostPort() } this.creationTimestamp = Date.now() this._disableLosslessIntegers = disableLosslessIntegers - this._pendingObservers = [] - this._currentObserver = undefined this._ch = channel - this._dechunker = new Dechunker() - this._chunker = new Chunker(channel) + this._chunker = chunker this._log = log this._serversideRouting = serversideRouting @@ -82,10 +138,7 @@ export default class ChannelConnection extends Connection { * @private * @type {BoltProtocol} */ - this._protocol = null - - // error extracted from a FAILURE message - this._currentFailure = null + this._protocol = protocolSupplier(this) // Set to true on fatal errors, to get this out of connection pool. this._isBroken = false @@ -95,30 +148,6 @@ export default class ChannelConnection extends Connection { } } - /** - * Crete new connection to the provided address. Returned connection is not connected. - * @param {ServerAddress} address - the Bolt endpoint to connect to. - * @param {Object} config - the driver configuration. - * @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors. - * @param {Logger} log - configured logger. - * @return {Connection} - new connection. - */ - static create (address, config, errorHandler, log, serversideRouting = null) { - const channelConfig = new ChannelConfig( - address, - config, - errorHandler.errorCode() - ) - return new ChannelConnection( - new Channel(channelConfig), - errorHandler, - address, - log, - config.disableLosslessIntegers, - serversideRouting - ) - } - get id () { return this._id } @@ -132,72 +161,13 @@ export default class ChannelConnection extends Connection { } /** - * Connect to the target address, negotiate Bolt protocol and send initialization message. + * Send initialization message. * @param {string} userAgent the user agent for this driver. * @param {Object} authToken the object containing auth information. * @return {Promise} promise resolved with the current connection if connection is successful. Rejected promise otherwise. */ connect (userAgent, authToken) { - return this._negotiateProtocol().then(() => - this._initialize(userAgent, authToken) - ) - } - - /** - * Execute Bolt protocol handshake to initialize the protocol version. - * @return {Promise} promise resolved with the current connection if handshake is successful. Rejected promise otherwise. - */ - _negotiateProtocol () { - const protocolHandshaker = new ProtocolHandshaker( - this, - this._ch, - this._chunker, - this._disableLosslessIntegers, - this._log, - this._serversideRouting - ) - - return new Promise((resolve, reject) => { - const handshakeErrorHandler = error => { - this._handleFatalError(error) - reject(error) - } - - this._ch.onerror = handshakeErrorHandler.bind(this) - if (this._ch._error) { - // channel is already broken - handshakeErrorHandler(this._ch._error) - } - - this._ch.onmessage = buffer => { - try { - // read the response buffer and initialize the protocol - this._protocol = protocolHandshaker.createNegotiatedProtocol(buffer) - - // reset the error handler to just handle errors and forget about the handshake promise - this._ch.onerror = this._handleFatalError.bind(this) - - // Ok, protocol running. Simply forward all messages to the dechunker - this._ch.onmessage = buf => this._dechunker.write(buf) - - // setup dechunker to dechunk messages and forward them to the message handler - this._dechunker.onmessage = buf => { - this._handleMessage(this._protocol.unpacker().unpack(buf)) - } - // forward all pending bytes to the dechunker - if (buffer.hasRemaining()) { - this._dechunker.write(buffer.readSlice(buffer.remaining())) - } - - resolve(this) - } catch (e) { - this._handleFatalError(e) - reject(e) - } - } - - protocolHandshaker.writeHandshakeRequest() - }) + return this._initialize(userAgent, authToken) } /** @@ -213,7 +183,22 @@ export default class ChannelConnection extends Connection { userAgent, authToken, onError: err => reject(err), - onComplete: () => resolve(self) + onComplete: metadata => { + if (metadata) { + // read server version from the response metadata, if it is available + const serverVersion = metadata.server + if (!this.version) { + this.version = serverVersion + } + + // read database connection id from the response metadata, if it is available + const dbConnectionId = metadata.connection_id + if (!this.databaseId) { + this.databaseId = dbConnectionId + } + } + resolve(self) + } }) }) } @@ -248,35 +233,6 @@ export default class ChannelConnection extends Connection { return this._server } - /** - * Write a message to the network channel. - * @param {RequestMessage} message the message to write. - * @param {ResultStreamObserver} observer the response observer. - * @param {boolean} flush `true` if flush should happen after the message is written to the buffer. - */ - write (message, observer, flush) { - const queued = this._queueObserver(observer) - - if (queued) { - if (this._log.isDebugEnabled()) { - this._log.debug(`${this} C: ${message}`) - } - - this._protocol - .packer() - .packStruct( - message.signature, - message.fields.map(field => this._packable(field)) - ) - - this._chunker.messageBoundary() - - if (flush) { - this._chunker.flush() - } - } - } - /** * "Fatal" means the connection is dead. Only call this if something * happens that cannot be recovered from. This will lead to all subscribers @@ -294,82 +250,20 @@ export default class ChannelConnection extends Connection { ) } - if (this._currentObserver && this._currentObserver.onError) { - this._currentObserver.onError(this._error) - } - while (this._pendingObservers.length > 0) { - const observer = this._pendingObservers.shift() - if (observer && observer.onError) { - observer.onError(this._error) - } - } + this._protocol.notifyFatalError(this._error) } - _handleMessage (msg) { - if (this._isBroken) { - // ignore all incoming messages when this connection is broken. all previously pending observers failed - // with the fatal error. all future observers will fail with same fatal error. - return - } - - const payload = msg.fields[0] + /** + * This method still here because it's used by the {@link PooledConnectionProvider} + * + * @param {any} observer + */ + _queueObserver (observer) { + return this._protocol.queueObserverIfProtocolIsNotBroken(observer) + } - switch (msg.signature) { - case RECORD: - if (this._log.isDebugEnabled()) { - this._log.debug(`${this} S: RECORD ${JSON.stringify(msg)}`) - } - this._currentObserver.onNext(payload) - break - case SUCCESS: - if (this._log.isDebugEnabled()) { - this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`) - } - try { - const metadata = this._protocol.transformMetadata(payload) - this._currentObserver.onCompleted(metadata) - } finally { - this._updateCurrentObserver() - } - break - case FAILURE: - if (this._log.isDebugEnabled()) { - this._log.debug(`${this} S: FAILURE ${JSON.stringify(msg)}`) - } - try { - const error = newError(payload.message, payload.code) - this._currentFailure = this.handleAndTransformError( - error, - this._address - ) - this._currentObserver.onError(this._currentFailure) - } finally { - this._updateCurrentObserver() - // Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure. - this._resetOnFailure() - } - break - case IGNORED: - if (this._log.isDebugEnabled()) { - this._log.debug(`${this} S: IGNORED ${JSON.stringify(msg)}`) - } - try { - if (this._currentFailure && this._currentObserver.onError) { - this._currentObserver.onError(this._currentFailure) - } else if (this._currentObserver.onError) { - this._currentObserver.onError( - newError('Ignored either because of an error or RESET') - ) - } - } finally { - this._updateCurrentObserver() - } - break - default: - this._handleFatalError( - newError('Unknown Bolt protocol message: ' + msg) - ) - } + hasOngoingObservableRequests () { + return this._protocol.hasOngoingObservableRequests() } /** @@ -400,39 +294,20 @@ export default class ChannelConnection extends Connection { _resetOnFailure () { this._protocol.reset({ onError: () => { - this._currentFailure = null + this._protocol.resetFailure() }, onComplete: () => { - this._currentFailure = null + this._protocol.resetFailure() } }) } - _queueObserver (observer) { - if (this._isBroken) { - if (observer && observer.onError) { - observer.onError(this._error) - } - return false - } - observer = observer || NO_OP_OBSERVER - observer.onCompleted = observer.onCompleted || NO_OP - observer.onError = observer.onError || NO_OP - observer.onNext = observer.onNext || NO_OP - if (this._currentObserver === undefined) { - this._currentObserver = observer - } else { - this._pendingObservers.push(observer) - } - return true - } - /* * Pop next pending observer form the list of observers and make it current observer. * @protected */ _updateCurrentObserver () { - this._currentObserver = this._pendingObservers.shift() + this._protocol.updateCurrentObserver() } /** Check if this connection is in working condition */ @@ -466,12 +341,8 @@ export default class ChannelConnection extends Connection { return `Connection [${this.id}][${this.databaseId || ''}]` } - _packable (value) { - return this._protocol.packer().packable(value) - } - _handleProtocolError (message) { - this._currentFailure = null + this._protocol.resetFailure() this._updateCurrentObserver() const error = newError(message, PROTOCOL_ERROR) this._handleFatalError(error) diff --git a/src/internal/connection-provider-direct.js b/src/internal/connection-provider-direct.js index 57518afef..48c0ca6ef 100644 --- a/src/internal/connection-provider-direct.js +++ b/src/internal/connection-provider-direct.js @@ -19,7 +19,7 @@ import PooledConnectionProvider from './connection-provider-pooled' import DelegateConnection from './connection-delegate' -import ChannelConnection from './connection-channel' +import { createChannelConnection } from './connection-channel' import { BOLT_PROTOCOL_V4_0, BOLT_PROTOCOL_V3 } from './constants' export default class DirectConnectionProvider extends PooledConnectionProvider { @@ -40,25 +40,24 @@ export default class DirectConnectionProvider extends PooledConnectionProvider { } async _hasProtocolVersion (versionPredicate) { - const connection = ChannelConnection.create( + const connection = await createChannelConnection( this._address, this._config, this._createConnectionErrorHandler(), this._log ) - try { - await connection._negotiateProtocol() + const protocolVersion = connection.protocol() + ? connection.protocol().version + : null - const protocol = connection.protocol() - if (protocol) { - return versionPredicate(protocol.version) - } + await connection.close() - return false - } finally { - await connection.close() + if (protocolVersion) { + return versionPredicate(protocolVersion) } + + return false } async supportsMultiDb () { diff --git a/src/internal/connection-provider-pooled.js b/src/internal/connection-provider-pooled.js index 20b925d74..43d7dfc4c 100644 --- a/src/internal/connection-provider-pooled.js +++ b/src/internal/connection-provider-pooled.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import ChannelConnection from './connection-channel' +import { createChannelConnection } from './connection-channel' import Pool from './pool' import PoolConfig from './pool-config' import ConnectionErrorHandler from './connection-error-handler' @@ -39,7 +39,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { this._createChannelConnection = createChannelConnectionHook || (address => { - return ChannelConnection.create( + return createChannelConnection( address, this._config, this._createConnectionErrorHandler(), @@ -72,15 +72,17 @@ export default class PooledConnectionProvider extends ConnectionProvider { * @access private */ _createConnection (address, release) { - const connection = this._createChannelConnection(address) - connection._release = () => release(address, connection) - this._openConnections[connection.id] = connection - - return connection.connect(this._userAgent, this._authToken).catch(error => { - // let's destroy this connection - this._destroyConnection(connection) - // propagate the error because connection failed to connect / initialize - throw error + return this._createChannelConnection(address).then(connection => { + connection._release = () => release(address, connection) + this._openConnections[connection.id] = connection + return connection + .connect(this._userAgent, this._authToken) + .catch(error => { + // let's destroy this connection + this._destroyConnection(connection) + // propagate the error because connection failed to connect / initialize + throw error + }) }) } diff --git a/src/internal/connection-provider-routing.js b/src/internal/connection-provider-routing.js index e8741091a..d33b74100 100644 --- a/src/internal/connection-provider-routing.js +++ b/src/internal/connection-provider-routing.js @@ -29,7 +29,7 @@ import ConnectionErrorHandler from './connection-error-handler' import DelegateConnection from './connection-delegate' import LeastConnectedLoadBalancingStrategy from './least-connected-load-balancing-strategy' import Bookmark from './bookmark' -import ChannelConnection from './connection-channel' +import { createChannelConnection } from './connection-channel' import { int } from '../integer' import { BOLT_PROTOCOL_V3, BOLT_PROTOCOL_V4_0 } from './constants' @@ -53,7 +53,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider routingTablePurgeDelay }) { super({ id, config, log, userAgent, authToken }, address => { - return ChannelConnection.create( + return createChannelConnection( address, this._config, this._createConnectionErrorHandler(), @@ -164,27 +164,30 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider let lastError for (let i = 0; i < addresses.length; i++) { - const connection = ChannelConnection.create( - addresses[i], - this._config, - this._createConnectionErrorHandler(), - this._log - ) - try { - await connection._negotiateProtocol() + const connection = await createChannelConnection( + this._address, + this._config, + this._createConnectionErrorHandler(), + this._log + ) - const protocol = connection.protocol() - if (protocol) { - return versionPredicate(protocol.version) + const protocolVersion = connection.protocol() + ? connection.protocol().version + : null + + await connection.close() + + if (protocolVersion) { + return versionPredicate(protocolVersion) } return false } catch (error) { lastError = error - } finally { - await connection.close() } + + return false } if (lastError) { diff --git a/src/internal/connection.js b/src/internal/connection.js index 709977a05..022bb9c44 100644 --- a/src/internal/connection.js +++ b/src/internal/connection.js @@ -17,8 +17,7 @@ * limitations under the License. */ -import { ResultStreamObserver } from './stream-observers' -import BoltProtocol from './bolt-protocol-v1' +import { ResultStreamObserver, BoltProtocol } from './bolt' export default class Connection { /** diff --git a/src/internal/connectivity-verifier.js b/src/internal/connectivity-verifier.js index 02884c9cc..911ebdaad 100644 --- a/src/internal/connectivity-verifier.js +++ b/src/internal/connectivity-verifier.js @@ -19,7 +19,7 @@ import ConnectionHolder from './connection-holder' import { READ } from '../driver' -import { ResultStreamObserver } from './stream-observers' +import { ResultStreamObserver } from './bolt' /** * Verifies connectivity using the given connection provider. diff --git a/src/internal/protocol-handshaker.js b/src/internal/protocol-handshaker.js deleted file mode 100644 index 3d6f5e1a8..000000000 --- a/src/internal/protocol-handshaker.js +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { alloc } from './node' -import { newError } from '../error' -import BoltProtocolV1 from './bolt-protocol-v1' -import BoltProtocolV2 from './bolt-protocol-v2' -import BoltProtocolV3 from './bolt-protocol-v3' -import BoltProtocolV4x0 from './bolt-protocol-v4x0' -import BoltProtocolV4x1 from './bolt-protocol-v4x1' -import BoltProtocolV4x2 from './bolt-protocol-v4x2' -import BoltProtocolV4x3 from './bolt-protocol-v4x3' -const BOLT_MAGIC_PREAMBLE = 0x6060b017 - -export default class ProtocolHandshaker { - /** - * @constructor - * @param {Connection} connection the connection owning this protocol. - * @param {Channel} channel the network channel. - * @param {Chunker} chunker the message chunker. - * @param {boolean} disableLosslessIntegers flag to use native JS numbers. - * @param {Logger} log the logger. - */ - constructor ( - connection, - channel, - chunker, - disableLosslessIntegers, - log, - serversideRouting = null - ) { - this._connection = connection - this._channel = channel - this._chunker = chunker - this._disableLosslessIntegers = disableLosslessIntegers - this._log = log - this._serversideRouting = serversideRouting - } - - /** - * Write a Bolt handshake into the underlying network channel. - */ - writeHandshakeRequest () { - this._channel.write(newHandshakeBuffer()) - } - - /** - * Read the given handshake response and create the negotiated bolt protocol. - * @param {BaseBuffer} buffer byte buffer containing the handshake response. - * @return {BoltProtocol} bolt protocol corresponding to the version suggested by the database. - * @throws {Neo4jError} when bolt protocol can't be instantiated. - */ - createNegotiatedProtocol (buffer) { - const h = [ - buffer.readUInt8(), - buffer.readUInt8(), - buffer.readUInt8(), - buffer.readUInt8() - ] - if (h[0] === 0x48 && h[1] === 0x54 && h[2] === 0x54 && h[3] === 0x50) { - throw newError( - 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + - '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' - ) - } - const negotiatedVersion = Number(h[3] + '.' + h[2]) - if (this._log.isDebugEnabled()) { - this._log.debug( - `${this._connection} negotiated protocol version ${negotiatedVersion}` - ) - } - return this._createProtocolWithVersion(negotiatedVersion) - } - - /** - * @return {BoltProtocol} - * @private - */ - _createProtocolWithVersion (version) { - switch (version) { - case 1: - return new BoltProtocolV1( - this._connection, - this._chunker, - this._disableLosslessIntegers - ) - case 2: - return new BoltProtocolV2( - this._connection, - this._chunker, - this._disableLosslessIntegers - ) - case 3: - return new BoltProtocolV3( - this._connection, - this._chunker, - this._disableLosslessIntegers - ) - case 4.0: - return new BoltProtocolV4x0( - this._connection, - this._chunker, - this._disableLosslessIntegers - ) - case 4.1: - return new BoltProtocolV4x1( - this._connection, - this._chunker, - this._disableLosslessIntegers, - this._serversideRouting - ) - case 4.2: - return new BoltProtocolV4x2( - this._connection, - this._chunker, - this._disableLosslessIntegers, - this._serversideRouting - ) - case 4.3: - return new BoltProtocolV4x3( - this._connection, - this._chunker, - this._disableLosslessIntegers, - this._serversideRouting - ) - default: - throw newError('Unknown Bolt protocol version: ' + version) - } - } -} - -/** - * @return {BaseBuffer} - * @private - */ -function newHandshakeBuffer () { - const handshakeBuffer = alloc(5 * 4) - - // magic preamble - handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE) - - // proposed versions - handshakeBuffer.writeInt32((1 << 16) | (3 << 8) | 4) - handshakeBuffer.writeInt32((1 << 8) | 4) - handshakeBuffer.writeInt32(4) - handshakeBuffer.writeInt32(3) - - // reset the reader position - handshakeBuffer.reset() - - return handshakeBuffer -} diff --git a/src/internal/rediscovery.js b/src/internal/rediscovery.js index 704633069..9ab50ac2e 100644 --- a/src/internal/rediscovery.js +++ b/src/internal/rediscovery.js @@ -17,7 +17,7 @@ * limitations under the License. */ import RoutingTable from './routing-table' -import RawRoutingTable from './routing-table-raw' +import { RawRoutingTable } from './bolt' import Session from '../session' import ServerAddress from './server-address' import { newError, SERVICE_UNAVAILABLE } from '../error' diff --git a/src/result.js b/src/result.js index 6dc77859f..74169f03e 100644 --- a/src/result.js +++ b/src/result.js @@ -19,7 +19,7 @@ import ResultSummary from './result-summary' import { EMPTY_CONNECTION_HOLDER } from './internal/connection-holder' -import { ResultStreamObserver } from './internal/stream-observers' +import { ResultStreamObserver } from './internal/bolt' const DEFAULT_ON_ERROR = error => { console.log('Uncaught error when processing result: ' + error) diff --git a/src/session.js b/src/session.js index 42cccfa8c..42ecdb01b 100644 --- a/src/session.js +++ b/src/session.js @@ -16,10 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { - ResultStreamObserver, - FailedObserver -} from './internal/stream-observers' +import { ResultStreamObserver, FailedObserver } from './internal/bolt' import Result from './result' import Transaction from './transaction' import { newError } from './error' diff --git a/src/transaction.js b/src/transaction.js index ccb02a8a6..4734ec54b 100644 --- a/src/transaction.js +++ b/src/transaction.js @@ -28,7 +28,7 @@ import { ResultStreamObserver, FailedObserver, CompletedObserver -} from './internal/stream-observers' +} from './internal/bolt' import { newError } from './error' /** diff --git a/test/internal/bolt-protocol-v1.test.js b/test/internal/bolt/bolt-protocol-v1.test.js similarity index 71% rename from test/internal/bolt-protocol-v1.test.js rename to test/internal/bolt/bolt-protocol-v1.test.js index a21046e06..a892d67b4 100644 --- a/test/internal/bolt-protocol-v1.test.js +++ b/test/internal/bolt/bolt-protocol-v1.test.js @@ -17,13 +17,13 @@ * limitations under the License. */ -import BoltProtocolV1 from '../../src/internal/bolt-protocol-v1' -import RequestMessage from '../../src/internal/request-message' -import Bookmark from '../../src/internal/bookmark' -import TxConfig from '../../src/internal/tx-config' -import { WRITE } from '../../src/driver' -import utils from './test-utils' -import { LoginObserver } from '../../src/internal/stream-observers' +import BoltProtocolV1 from '../../../src/internal/bolt/bolt-protocol-v1' +import RequestMessage from '../../../src/internal/bolt/request-message' +import Bookmark from '../../../src/internal/bookmark' +import TxConfig from '../../../src/internal/tx-config' +import { WRITE } from '../../../src/driver' +import utils from '../test-utils' +import { LoginObserver } from '../../../src/internal/bolt/stream-observers' describe('#unit BoltProtocolV1', () => { beforeEach(() => { @@ -55,7 +55,9 @@ describe('#unit BoltProtocolV1', () => { it('should initialize the connection', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const onError = _error => {} const onComplete = () => {} @@ -71,20 +73,20 @@ describe('#unit BoltProtocolV1', () => { expect(observer).toBeTruthy() expect(observer instanceof LoginObserver).toBeTruthy() - expect(observer._afterError).toBe(onError) - expect(observer._afterComplete).toBe(onComplete) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.init(clientName, authToken) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should run a query', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const query = 'RETURN $x, $y' const parameters = { x: 'x', y: 'y' } @@ -94,31 +96,35 @@ describe('#unit BoltProtocolV1', () => { mode: WRITE }) - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage( + expect(protocol.messages[0]).toBeMessage( RequestMessage.run(query, parameters) ) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pullAll()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pullAll()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should reset the connection', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const observer = protocol.reset() - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage(RequestMessage.reset()) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.reset()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should begin a transaction', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const bookmark = new Bookmark('neo4j:bookmark:v1:tx42') @@ -128,42 +134,46 @@ describe('#unit BoltProtocolV1', () => { mode: WRITE }) - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage( + expect(protocol.messages[0]).toBeMessage( RequestMessage.run('BEGIN', bookmark.asBeginTransactionParameters()) ) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pullAll()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, false]) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pullAll()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, false]) }) it('should commit a transaction', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const observer = protocol.commitTransaction() - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage(RequestMessage.run('COMMIT', {})) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pullAll()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[0]).toBeMessage(RequestMessage.run('COMMIT', {})) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pullAll()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should rollback a transaction', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const observer = protocol.rollbackTransaction() - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage(RequestMessage.run('ROLLBACK', {})) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pullAll()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[0]).toBeMessage(RequestMessage.run('ROLLBACK', {})) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pullAll()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should return correct bolt version number', () => { diff --git a/test/internal/bolt-protocol-v2.test.js b/test/internal/bolt/bolt-protocol-v2.test.js similarity index 89% rename from test/internal/bolt-protocol-v2.test.js rename to test/internal/bolt/bolt-protocol-v2.test.js index 502e7c728..06abc741a 100644 --- a/test/internal/bolt-protocol-v2.test.js +++ b/test/internal/bolt/bolt-protocol-v2.test.js @@ -17,8 +17,8 @@ * limitations under the License. */ -import BoltProtocolV2 from '../../src/internal/bolt-protocol-v2' -import utils from './test-utils' +import BoltProtocolV2 from '../../../src/internal/bolt/bolt-protocol-v2' +import utils from '../test-utils' describe('#unit BoltProtocolV2', () => { beforeEach(() => { diff --git a/test/internal/bolt-protocol-v3.test.js b/test/internal/bolt/bolt-protocol-v3.test.js similarity index 77% rename from test/internal/bolt-protocol-v3.test.js rename to test/internal/bolt/bolt-protocol-v3.test.js index a8d52067e..24e432ca2 100644 --- a/test/internal/bolt-protocol-v3.test.js +++ b/test/internal/bolt/bolt-protocol-v3.test.js @@ -17,16 +17,16 @@ * limitations under the License. */ -import BoltProtocolV3 from '../../src/internal/bolt-protocol-v3' -import RequestMessage from '../../src/internal/request-message' -import utils from './test-utils' -import Bookmark from '../../src/internal/bookmark' -import TxConfig from '../../src/internal/tx-config' -import { WRITE } from '../../src/driver' +import BoltProtocolV3 from '../../../src/internal/bolt/bolt-protocol-v3' +import RequestMessage from '../../../src/internal/bolt/request-message' +import utils from '../test-utils' +import Bookmark from '../../../src/internal/bookmark' +import TxConfig from '../../../src/internal/tx-config' +import { WRITE } from '../../../src/driver' import { ProcedureRouteObserver, ResultStreamObserver -} from '../../src/internal/stream-observers' +} from '../../../src/internal/bolt/stream-observers' describe('#unit BoltProtocolV3', () => { beforeEach(() => { @@ -50,18 +50,19 @@ describe('#unit BoltProtocolV3', () => { it('should initialize connection', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) const clientName = 'js-driver/1.2.3' const authToken = { username: 'neo4j', password: 'secret' } const observer = protocol.initialize({ userAgent: clientName, authToken }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.hello(clientName, authToken) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should run a query', () => { @@ -75,6 +76,7 @@ describe('#unit BoltProtocolV3', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) const query = 'RETURN $x, $y' const parameters = { x: 'x', y: 'y' } @@ -85,18 +87,18 @@ describe('#unit BoltProtocolV3', () => { mode: WRITE }) - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage( + expect(protocol.messages[0]).toBeMessage( RequestMessage.runWithMetadata(query, parameters, { bookmark, txConfig, mode: WRITE }) ) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pullAll()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pullAll()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should begin a transaction', () => { @@ -110,6 +112,7 @@ describe('#unit BoltProtocolV3', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.beginTransaction({ bookmark, @@ -117,36 +120,38 @@ describe('#unit BoltProtocolV3', () => { mode: WRITE }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.begin({ bookmark, txConfig, mode: WRITE }) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should commit', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.commitTransaction() - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage(RequestMessage.commit()) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.commit()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should rollback', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.rollbackTransaction() - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage(RequestMessage.rollback()) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.rollback()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should return correct bolt version number', () => { @@ -158,6 +163,7 @@ describe('#unit BoltProtocolV3', () => { it('should request the routing table from the correct procedure', () => { const expectedResultObserver = new ResultStreamObserver() const protocol = new SpiedBoltProtocolV3(expectedResultObserver) + utils.spyProtocolWrite(protocol) const routingContext = { abc: 'context ' } const sessionContext = { bookmark: 'book' } const onError = () => {} @@ -194,6 +200,7 @@ describe('#unit BoltProtocolV3', () => { function verifyError (fn) { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) expect(() => fn(protocol)).toThrowError( 'Driver is connected to the database that does not support multiple databases. ' + diff --git a/test/internal/bolt-protocol-v4x0.test.js b/test/internal/bolt/bolt-protocol-v4x0.test.js similarity index 80% rename from test/internal/bolt-protocol-v4x0.test.js rename to test/internal/bolt/bolt-protocol-v4x0.test.js index 2e2f3c932..56ddbb1c3 100644 --- a/test/internal/bolt-protocol-v4x0.test.js +++ b/test/internal/bolt/bolt-protocol-v4x0.test.js @@ -17,16 +17,16 @@ * limitations under the License. */ -import BoltProtocolV4x0 from '../../src/internal/bolt-protocol-v4x0' -import RequestMessage from '../../src/internal/request-message' -import utils from './test-utils' -import Bookmark from '../../src/internal/bookmark' -import TxConfig from '../../src/internal/tx-config' -import { WRITE } from '../../src/driver' +import BoltProtocolV4x0 from '../../../src/internal/bolt/bolt-protocol-v4x0' +import RequestMessage from '../../../src/internal/bolt/request-message' +import utils from '../test-utils' +import Bookmark from '../../../src/internal/bookmark' +import TxConfig from '../../../src/internal/tx-config' +import { WRITE } from '../../../src/driver' import { ProcedureRouteObserver, ResultStreamObserver -} from '../../src/internal/stream-observers' +} from '../../../src/internal/bolt/stream-observers' describe('#unit BoltProtocolV4x0', () => { beforeEach(() => { @@ -45,6 +45,7 @@ describe('#unit BoltProtocolV4x0', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x0(recorder, null, false) + utils.spyProtocolWrite(protocol) const query = 'RETURN $x, $y' const parameters = { x: 'x', y: 'y' } @@ -56,9 +57,9 @@ describe('#unit BoltProtocolV4x0', () => { mode: WRITE }) - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage( + expect(protocol.messages[0]).toBeMessage( RequestMessage.runWithMetadata(query, parameters, { bookmark, txConfig, @@ -66,9 +67,9 @@ describe('#unit BoltProtocolV4x0', () => { mode: WRITE }) ) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pull()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pull()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should begin a transaction', () => { @@ -83,6 +84,7 @@ describe('#unit BoltProtocolV4x0', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x0(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.beginTransaction({ bookmark, @@ -91,12 +93,12 @@ describe('#unit BoltProtocolV4x0', () => { mode: WRITE }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.begin({ bookmark, txConfig, database, mode: WRITE }) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should return correct bolt version number', () => { @@ -108,6 +110,7 @@ describe('#unit BoltProtocolV4x0', () => { it('should request the routing table from the correct procedure', () => { const expectedResultObserver = new ResultStreamObserver() const protocol = new SpiedBoltProtocolV4x0(expectedResultObserver) + utils.spyProtocolWrite(protocol) const routingContext = { abc: 'context ' } const sessionContext = { bookmark: 'book' } const databaseName = 'the name' diff --git a/test/internal/bolt-protocol-v4x3.test.js b/test/internal/bolt/bolt-protocol-v4x3.test.js similarity index 69% rename from test/internal/bolt-protocol-v4x3.test.js rename to test/internal/bolt/bolt-protocol-v4x3.test.js index b8e2cdcbb..4b8134902 100644 --- a/test/internal/bolt-protocol-v4x3.test.js +++ b/test/internal/bolt/bolt-protocol-v4x3.test.js @@ -17,13 +17,13 @@ * limitations under the License. */ -import BoltProtocolV4x3 from '../../src/internal/bolt-protocol-v4x3' -import RequestMessage from '../../src/internal/request-message' -import utils from './test-utils' -import Bookmark from '../../src/internal/bookmark' -import TxConfig from '../../src/internal/tx-config' -import { WRITE } from '../../src/driver' -import { RouteObserver } from '../../src/internal/stream-observers' +import BoltProtocolV4x3 from '../../../src/internal/bolt/bolt-protocol-v4x3' +import RequestMessage from '../../../src/internal/bolt/request-message' +import utils from '../test-utils' +import Bookmark from '../../../src/internal/bookmark' +import TxConfig from '../../../src/internal/tx-config' +import { WRITE } from '../../../src/driver' +import { RouteObserver } from '../../../src/internal/bolt/stream-observers' describe('#unit BoltProtocolV4x3', () => { beforeEach(() => { @@ -33,6 +33,7 @@ describe('#unit BoltProtocolV4x3', () => { it('should request routing information', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const routingContext = { someContextParam: 'value' } const databaseName = 'name' @@ -41,13 +42,13 @@ describe('#unit BoltProtocolV4x3', () => { databaseName }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.route({ ...routingContext, address: null }, databaseName) ) - expect(recorder.observers).toEqual([observer]) + expect(protocol.observers).toEqual([observer]) expect(observer).toEqual(jasmine.any(RouteObserver)) - expect(recorder.flushes).toEqual([true]) + expect(protocol.flushes).toEqual([true]) }) it('should run a query', () => { @@ -62,6 +63,7 @@ describe('#unit BoltProtocolV4x3', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const query = 'RETURN $x, $y' const parameters = { x: 'x', y: 'y' } @@ -73,9 +75,9 @@ describe('#unit BoltProtocolV4x3', () => { mode: WRITE }) - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage( + expect(protocol.messages[0]).toBeMessage( RequestMessage.runWithMetadata(query, parameters, { bookmark, txConfig, @@ -83,9 +85,9 @@ describe('#unit BoltProtocolV4x3', () => { mode: WRITE }) ) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pull()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pull()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should begin a transaction', () => { const database = 'testdb' @@ -99,6 +101,7 @@ describe('#unit BoltProtocolV4x3', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.beginTransaction({ bookmark, @@ -107,12 +110,12 @@ describe('#unit BoltProtocolV4x3', () => { mode: WRITE }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.begin({ bookmark, txConfig, database, mode: WRITE }) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should return correct bolt version number', () => { @@ -138,18 +141,19 @@ describe('#unit BoltProtocolV4x3', () => { it('should initialize connection', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const clientName = 'js-driver/1.2.3' const authToken = { username: 'neo4j', password: 'secret' } const observer = protocol.initialize({ userAgent: clientName, authToken }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.hello(clientName, authToken) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should begin a transaction', () => { @@ -163,6 +167,7 @@ describe('#unit BoltProtocolV4x3', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.beginTransaction({ bookmark, @@ -170,35 +175,37 @@ describe('#unit BoltProtocolV4x3', () => { mode: WRITE }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.begin({ bookmark, txConfig, mode: WRITE }) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should commit', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.commitTransaction() - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage(RequestMessage.commit()) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.commit()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should rollback', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.rollbackTransaction() - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage(RequestMessage.rollback()) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.rollback()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) }) diff --git a/test/internal/bolt/index.test.js b/test/internal/bolt/index.test.js new file mode 100644 index 000000000..fcf50e206 --- /dev/null +++ b/test/internal/bolt/index.test.js @@ -0,0 +1,329 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Bolt from '../../../src/internal/bolt' +import DummyChannel from '../dummy-channel' +import { alloc } from '../../../src/internal/node' +import { newError } from '../../../src/error' +import { Chunker, Dechunker } from '../../../src/internal/chunking' +import Logger from '../../../src/internal/logger' + +import BoltProtocolV1 from '../../../src/internal/bolt/bolt-protocol-v1' +import BoltProtocolV2 from '../../../src/internal/bolt/bolt-protocol-v2' +import BoltProtocolV3 from '../../../src/internal/bolt/bolt-protocol-v3' +import BoltProtocolV4x0 from '../../../src/internal/bolt/bolt-protocol-v4x0' +import BoltProtocolV4x1 from '../../../src/internal/bolt/bolt-protocol-v4x1' +import BoltProtocolV4x2 from '../../../src/internal/bolt/bolt-protocol-v4x2' +import BoltProtocolV4x3 from '../../../src/internal/bolt/bolt-protocol-v4x3' + +describe('#unit Bolt', () => { + describe('handshake', () => { + it('should write the correct handshake message', () => { + const { channel } = subject() + expect(channel.written.length).toBe(1) + const writtenBuffer = channel.written[0] + + const boltMagicPreamble = '60 60 b0 17' + const protocolVersion4x3to4x2 = '00 01 03 04' + const protocolVersion4x1 = '00 00 01 04' + const protocolVersion4x0 = '00 00 00 04' + const protocolVersion3 = '00 00 00 03' + + expect(writtenBuffer.toHex()).toEqual( + `${boltMagicPreamble} ${protocolVersion4x3to4x2} ${protocolVersion4x1} ${protocolVersion4x0} ${protocolVersion3}` + ) + }) + + it('should handle a successful handshake without reaining buffer', done => { + const { channel, handshakePromise } = subject() + const expectedProtocolVersion = 4.3 + + handshakePromise + .then(({ protocolVersion, consumeRemainingBuffer }) => { + expect(protocolVersion).toEqual(expectedProtocolVersion) + consumeRemainingBuffer(() => + done.fail('Should not have remaining buffer') + ) + done() + }) + .catch(done.fail.bind(done)) + + channel.onmessage(packedHandshakeMessage(expectedProtocolVersion)) + }) + + it('should handle a successful handshake with reaining buffer', done => { + const { channel, handshakePromise } = subject() + const expectedProtocolVersion = 4.3 + const expectedExtraBuffer = createExtraBuffer() + handshakePromise + .then(({ protocolVersion, consumeRemainingBuffer }) => { + expect(protocolVersion).toEqual(expectedProtocolVersion) + let consumeRemainingBufferCalled = false + consumeRemainingBuffer(buffer => { + consumeRemainingBufferCalled = true + expect(buffer.toHex()).toEqual(expectedExtraBuffer.toHex()) + }) + expect(consumeRemainingBufferCalled).toBeTruthy() + done() + }) + .catch(done.fail.bind(done)) + + channel.onmessage( + packedHandshakeMessage(expectedProtocolVersion, expectedExtraBuffer) + ) + }) + + it('should fail if the server responds with the http header', done => { + const { channel, handshakePromise } = subject() + const httpMagicNumber = 1213486160 + + handshakePromise + .then(() => done.fail('should not resolve an failure')) + .catch(error => { + expect(error).toEqual( + newError( + 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' + ) + ) + done() + }) + + channel.onmessage(packedHandshakeMessage(httpMagicNumber)) + }) + it('should handle a failed handshake', done => { + const { channel, handshakePromise } = subject() + const expectedError = new Error('Something got wrong') + + handshakePromise + .then(() => done.fail('should not resolve an failure')) + .catch(error => { + expect(error).toBe(expectedError) + done() + }) + + channel.onerror(expectedError) + }) + + it('should handle an already broken channel', done => { + const channel = new DummyChannel() + const expectedError = new Error('Something got wrong') + channel._error = expectedError + const { handshakePromise } = subject({ channel }) + + handshakePromise + .then(() => done.fail('should resolve an failure')) + .catch(error => { + expect(error).toBe(expectedError) + done() + }) + }) + + function subject ({ channel = new DummyChannel() } = {}) { + return { + channel, + handshakePromise: Bolt.handshake(channel) + } + } + + function packedHandshakeMessage (protocolVersion, extraBuffer) { + const major = Math.floor(protocolVersion) + const minor = protocolVersion * 10 - major * 10 + const bufferLength = 4 + (extraBuffer ? extraBuffer.length : 0) + const result = alloc(bufferLength) + result.putInt32(0, (minor << 8) | major) + if (extraBuffer) { + result.putBytes(4, extraBuffer) + } + result.reset() + return result + } + + function createExtraBuffer () { + const buffer = alloc(16) + buffer.putInt32(0, 1970) + buffer.putInt32(4, 1984) + buffer.putInt32(8, 2010) + buffer.putInt32(12, 2012) + buffer.reset() + return buffer + } + }) + + describe('create', () => { + forEachAvailableProtcol(({ version, protocolClass }) => { + it(`it should create protocol ${version}`, () => { + const params = createBoltCreateParams({ version }) + + const protocol = Bolt.create(params) + + expect(protocol.version).toEqual(version) + expect(protocol).toEqual(jasmine.any(protocolClass)) + expect(protocol._server).toBe(params.server) + expect(protocol._packer).toEqual(protocol._createPacker(params.chunker)) + expect(protocol._unpacker).toEqual( + protocol._createUnpacker(params.disableLosslessIntegers) + ) + expect(protocol._log).toEqual(params.log) + const expectedError = 'Some error' + protocol._onProtocolError(expectedError) + expect(params.observer.protocolErrors).toEqual([expectedError]) + }) + + it(`it should configure configure the correct ResponseHandler for version ${version}`, () => { + const expectedFailure = 'expected failure' + const expectedError = 'expected error' + const expectedErrorAppliedTransformation = + 'expected error applied transformation' + const params = createBoltCreateParams({ version }) + + const protocol = Bolt.create(params) + + expect(protocol._responseHandler).toBeDefined() + const responseHandler = protocol._responseHandler + expect(responseHandler._log).toBe(params.log) + + const observer = responseHandler._observer + observer.onError(expectedError) + observer.onFailure(expectedFailure) + observer.onErrorApplyTransformation(expectedErrorAppliedTransformation) + + expect(params.observer.failures).toEqual([expectedFailure]) + expect(params.observer.errors).toEqual([expectedError]) + expect(params.observer.errorsAppliedTransformation).toEqual([ + expectedErrorAppliedTransformation + ]) + }) + + it(`it should configure the channel.onerror to call the observer for version ${version}`, () => { + const expectedError = 'expected error' + const params = createBoltCreateParams({ version }) + + const protocol = Bolt.create(params) + + expect(protocol).toBeDefined() + + params.channel.onerror(expectedError) + + expect(params.observer.errors).toEqual([expectedError]) + }) + + it(`it should configure the channel.onmessage to dechunk and call the response handler ${version}`, () => { + const params = createBoltCreateParams({ version }) + let receivedMessage = null + const expectedMessage = { + signature: 0x10, + fields: [123] + } + const protocol = Bolt.create(params) + protocol._responseHandler.handleResponse = msg => { + receivedMessage = msg + } + + protocol.packer().packStruct( + expectedMessage.signature, + expectedMessage.fields.map(field => protocol.packer().packable(field)) + ) + params.chunker.messageBoundary() + params.chunker.flush() + params.channel.onmessage(params.channel.toBuffer()) + + expect(receivedMessage).not.toBeNull() + expect(receivedMessage.signature).toEqual(expectedMessage.signature) + expect(receivedMessage.fields).toEqual(expectedMessage.fields) + }) + }) + + forEachUnknownProtocolVersion(version => { + it(`it should not create unknown protocol ${version}`, () => { + try { + Bolt.create(createBoltCreateParams({ version })) + fail(`should not create protocol version ${version} with success`) + } catch (error) { + expect(error).toEqual( + newError('Unknown Bolt protocol version: ' + version) + ) + } + }) + }) + + function forEachAvailableProtcol (lambda) { + function v (version, protocolClass) { + return { version, protocolClass } + } + + const availableProtocols = [ + v(1, BoltProtocolV1), + v(2, BoltProtocolV2), + v(3, BoltProtocolV3), + v(4.0, BoltProtocolV4x0), + v(4.1, BoltProtocolV4x1), + v(4.2, BoltProtocolV4x2), + v(4.3, BoltProtocolV4x3) + ] + + availableProtocols.forEach(lambda) + } + + function forEachUnknownProtocolVersion (lambda) { + ;[0, -1, 'javascript', undefined, null, 1.1].forEach(lambda) + } + + function createBoltCreateParams ({ version } = {}) { + const server = {} + const channel = new DummyChannel() + const chunker = new Chunker(channel) + const dechunker = new Dechunker() + const disableLosslessIntegers = false + const serversideRouting = false + const log = Logger.noOp() + const observer = createObserver() + return { + version, + server, + channel, + chunker, + dechunker, + disableLosslessIntegers, + serversideRouting, + log, + observer + } + } + + function createObserver () { + const protocolErrors = [] + const errorsAppliedTransformation = [] + const failures = [] + const errors = [] + return { + protocolErrors, + failures, + errors, + errorsAppliedTransformation, + onError: error => errors.push(error), + onFailure: failure => failures.push(failure), + onErrorApplyTransformation: error => { + errorsAppliedTransformation.push(error) + return error + }, + onProtocolError: protocolError => protocolErrors.push(protocolError) + } + } + }) +}) diff --git a/test/internal/request-message.test.js b/test/internal/bolt/request-message.test.js similarity index 96% rename from test/internal/request-message.test.js rename to test/internal/bolt/request-message.test.js index fbd2ebf1f..abe3bbacd 100644 --- a/test/internal/request-message.test.js +++ b/test/internal/bolt/request-message.test.js @@ -17,11 +17,11 @@ * limitations under the License. */ -import RequestMessage from '../../src/internal/request-message' -import Bookmark from '../../src/internal/bookmark' -import TxConfig from '../../src/internal/tx-config' -import { int } from '../../src' -import { READ, WRITE } from '../../src/driver' +import RequestMessage from '../../../src/internal/bolt/request-message' +import Bookmark from '../../../src/internal/bookmark' +import TxConfig from '../../../src/internal/tx-config' +import { int } from '../../../src' +import { READ, WRITE } from '../../../src/driver' describe('#unit RequestMessage', () => { it('should create INIT message', () => { diff --git a/test/internal/routing-table-raw.test.js b/test/internal/bolt/routing-table-raw.test.js similarity index 86% rename from test/internal/routing-table-raw.test.js rename to test/internal/bolt/routing-table-raw.test.js index 29034ccd1..426db6dcf 100644 --- a/test/internal/routing-table-raw.test.js +++ b/test/internal/bolt/routing-table-raw.test.js @@ -1,5 +1,23 @@ -import RawRoutingTable from '../../src/internal/routing-table-raw' -import Record from '../../src/record' +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import RawRoutingTable from '../../../src/internal/bolt/routing-table-raw' +import Record from '../../../src/record' describe('#unit RawRoutingTable', () => { describe('ofNull', () => { diff --git a/test/internal/stream-observer.test.js b/test/internal/bolt/stream-observer.test.js similarity index 83% rename from test/internal/stream-observer.test.js rename to test/internal/bolt/stream-observer.test.js index 0939dd6b0..4fb2b049c 100644 --- a/test/internal/stream-observer.test.js +++ b/test/internal/bolt/stream-observer.test.js @@ -17,24 +17,23 @@ * limitations under the License. */ -import FakeConnection from './fake-connection' import { ResultStreamObserver, RouteObserver, ProcedureRouteObserver -} from '../../src/internal/stream-observers' -import RawRoutingTable from '../../src/internal/routing-table-raw' -import { PROTOCOL_ERROR, newError } from '../../src/error' -import Record from '../../src/record' +} from '../../../src/internal/bolt/stream-observers' +import { RawRoutingTable } from '../../../src/internal/bolt' +import { PROTOCOL_ERROR, newError } from '../../../src/error' +import Record from '../../../src/record' const NO_OP = () => {} describe('#unit ResultStreamObserver', () => { - it('remembers resolved connection', () => { - const connection = new FakeConnection() - const streamObserver = newStreamObserver(connection) + it('remembers resolved server', () => { + const server = { address: '192.168.0.1' } + const streamObserver = newStreamObserver(server) - expect(streamObserver._connection).toBe(connection) + expect(streamObserver._server).toBe(server) }) it('remembers subscriber', () => { @@ -239,23 +238,27 @@ describe('#unit RouteObserver', () => { onError: metadata => { onErrorCalled = true expect(metadata).toBe(expectedError) - } + }, + onProtocolError: () => {} }).onError(expectedError) expect(onErrorCalled).toEqual(true) }) - it('should call connection._handleProtocolError when a protocol error occurs', () => { - const connection = new FakeConnection() + it('should call onProtocolError when a protocol error occurs', () => { + let onProtocolErrorCalled = false + const expectedError = newError('something wrong', PROTOCOL_ERROR) newRouteObserver({ onError: null, - connection + onProtocolError: message => { + onProtocolErrorCalled = true + expect(message).toEqual(expectedError.message) + } }).onError(expectedError) - expect(connection.protocolErrorsHandled).toEqual(1) - expect(connection.seenProtocolErrors).toEqual([expectedError.message]) + expect(onProtocolErrorCalled).toEqual(true) }) it('should call onError with a protocol error it receive a record', () => { @@ -271,14 +274,15 @@ describe('#unit RouteObserver', () => { onError: error => { onErrorCalled = true expect(error).toEqual(expectedError) - } + }, + onProtocolError: () => {} }).onNext(record) expect(onErrorCalled).toEqual(true) }) - it('should call connection._handleProtocolError with a protocol error it receive a record', () => { - const connection = new FakeConnection() + it('should call onProtocolError with a protocol error it receive a record', () => { + let onProtocolErrorCalled = false const record = new Record(['a'], ['b']) const expectedErrorMessage = 'Received RECORD when resetting: received record is: ' + @@ -286,19 +290,21 @@ describe('#unit RouteObserver', () => { newRouteObserver({ onError: null, - connection + onProtocolError: message => { + onProtocolErrorCalled = true + expect(message).toEqual(expectedErrorMessage) + } }).onNext(record) - expect(connection.protocolErrorsHandled).toEqual(1) - expect(connection.seenProtocolErrors).toEqual([expectedErrorMessage]) + expect(onProtocolErrorCalled).toEqual(true) }) function newRouteObserver ({ onCompleted = shouldNotBeCalled('onComplete'), onError = shouldNotBeCalled('onError'), - connection = new FakeConnection() + onProtocolError = shouldNotBeCalled('onProtocolError') } = {}) { - return new RouteObserver({ connection, onCompleted, onError }) + return new RouteObserver({ onCompleted, onError, onProtocolError }) } function shouldNotBeCalled (methodName) { @@ -342,7 +348,8 @@ describe('#unit ProcedureRouteObserver', () => { onError: error => { onErrorCalled = true expect(error).toEqual(expectedError) - } + }, + onProtocolError: () => {} }) observer.onCompleted() @@ -350,19 +357,21 @@ describe('#unit ProcedureRouteObserver', () => { expect(onErrorCalled).toEqual(true) }) - it('should call connection._handleProtocolError with a protocol error it receive 0 records', () => { - const connection = new FakeConnection() + it('should call onProtocolError with a protocol error it receive 0 records', () => { + let onProtocolErrorCalled = false const expectedErrorMessage = 'Illegal response from router. Received 0 records but expected only one.\n' + JSON.stringify([]) newRouteObserver({ onError: null, - connection + onProtocolError: message => { + onProtocolErrorCalled = true + expect(message).toEqual(expectedErrorMessage) + } }).onCompleted() - expect(connection.protocolErrorsHandled).toEqual(1) - expect(connection.seenProtocolErrors).toEqual([expectedErrorMessage]) + expect(onProtocolErrorCalled).toEqual(true) }) it('should call onError with a protocol error it receive more than one record', () => { @@ -377,7 +386,8 @@ describe('#unit ProcedureRouteObserver', () => { onError: error => { onErrorCalled = true expect(error).toEqual(expectedError) - } + }, + onProtocolError: () => {} }) observer.onNext(record) @@ -387,8 +397,8 @@ describe('#unit ProcedureRouteObserver', () => { expect(onErrorCalled).toEqual(true) }) - it('should call connection._handleProtocolError with a protocol error it receive 0 records', () => { - const connection = new FakeConnection() + it('should call onProtocolError with a protocol error it receive 0 records', () => { + let onProtocolErrorCalled = false const record = new Record(['a'], ['b']) const expectedErrorMessage = 'Illegal response from router. Received 2 records but expected only one.\n' + @@ -396,15 +406,17 @@ describe('#unit ProcedureRouteObserver', () => { const observer = newRouteObserver({ onError: null, - connection + onProtocolError: message => { + onProtocolErrorCalled = true + expect(message).toEqual(expectedErrorMessage) + } }) observer.onNext(record) observer.onNext(record) observer.onCompleted() - expect(connection.protocolErrorsHandled).toEqual(1) - expect(connection.seenProtocolErrors).toEqual([expectedErrorMessage]) + expect(onProtocolErrorCalled).toEqual(true) }) it('should call onError with the error', () => { @@ -429,34 +441,37 @@ describe('#unit ProcedureRouteObserver', () => { onError: metadata => { onErrorCalled = true expect(metadata).toBe(expectedError) - } + }, + onProtocolError: null }).onError(expectedError) expect(onErrorCalled).toEqual(true) }) - it('should call connection._handleProtocolError when a protocol error occurs', () => { - const connection = new FakeConnection() + it('should call onProtocolError when a protocol error occurs', () => { + let onProtocolErrorCalled = false const expectedError = newError('something wrong', PROTOCOL_ERROR) newRouteObserver({ onError: null, - connection + onProtocolError: message => { + onProtocolErrorCalled = true + expect(message).toEqual(expectedError.message) + } }).onError(expectedError) - expect(connection.protocolErrorsHandled).toEqual(1) - expect(connection.seenProtocolErrors).toEqual([expectedError.message]) + expect(onProtocolErrorCalled).toEqual(true) }) function newRouteObserver ({ onCompleted = shouldNotBeCalled('onComplete'), onError = shouldNotBeCalled('onError'), - connection = new FakeConnection(), + onProtocolError = shouldNotBeCalled('onProtocolError'), resultObserver = new FakeResultStreamObserver() } = {}) { return new ProcedureRouteObserver({ resultObserver, - connection, + onProtocolError, onCompleted, onError }) @@ -477,9 +492,9 @@ describe('#unit ProcedureRouteObserver', () => { } }) -function newStreamObserver (connection) { +function newStreamObserver (server) { return new ResultStreamObserver({ - connection + server }) } diff --git a/test/internal/connection-channel.test.js b/test/internal/connection-channel.test.js index cf5f8957b..f14b48c6d 100644 --- a/test/internal/connection-channel.test.js +++ b/test/internal/connection-channel.test.js @@ -18,7 +18,9 @@ */ import DummyChannel from './dummy-channel' -import ChannelConnection from '../../src/internal/connection-channel' +import ChannelConnection, { + createChannelConnection +} from '../../src/internal/connection-channel' import { Packer } from '../../src/internal/packstream-v1' import { Chunker } from '../../src/internal/chunking' import { alloc } from '../../src/internal/node' @@ -33,7 +35,7 @@ import Bookmark from '../../src/internal/bookmark' import TxConfig from '../../src/internal/tx-config' import { WRITE } from '../../src/driver' import ServerAddress from '../../src/internal/server-address' -import { ResultStreamObserver } from '../../src/internal/stream-observers' +import { ResultStreamObserver } from '../../src/internal/bolt' const ILLEGAL_MESSAGE = { signature: 42, fields: [] } const SUCCESS_MESSAGE = { signature: 0x70, fields: [{}] } @@ -52,12 +54,12 @@ describe('#integration ChannelConnection', () => { } }) - it('should have correct creation timestamp', () => { + it('should have correct creation timestamp', async () => { const clock = lolex.install() try { clock.setSystemTime(424242) - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) expect(connection.creationTimestamp).toEqual(424242) } finally { @@ -66,23 +68,23 @@ describe('#integration ChannelConnection', () => { }) it('should read/write basic messages', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - - connection._negotiateProtocol().then(() => { - connection.protocol().initialize({ - userAgent: 'mydriver/0.0.0', - authToken: basicAuthToken(), - onComplete: metadata => { - expect(metadata).not.toBeNull() - done() - }, - onError: console.log + createConnection(`bolt://${sharedNeo4j.hostname}`) + .then(connection => { + connection.protocol().initialize({ + userAgent: 'mydriver/0.0.0', + authToken: basicAuthToken(), + onComplete: metadata => { + expect(metadata).not.toBeNull() + done() + }, + onError: done.fail.bind(done) + }) }) - }) + .catch(done.fail.bind(done)) }) - it('should retrieve stream', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + it('should retrieve stream', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) const records = [] const pullAllObserver = { @@ -92,95 +94,82 @@ describe('#integration ChannelConnection', () => { onCompleted: () => { expect(records[0].get(0)).toBe(1) done() - } + }, + onError: done.fail.bind(done) } - connection.connect('mydriver/0.0.0', basicAuthToken()).then(() => { - connection - .protocol() - .run( - 'RETURN 1.0', - {}, - { - bookmark: Bookmark.empty(), - txConfig: TxConfig.empty(), - mode: WRITE - } - ) - .subscribe(pullAllObserver) - }) - }) - - it('should write protocol handshake', () => { - const channel = new DummyChannel() - connection = new ChannelConnection( - channel, - new ConnectionErrorHandler(SERVICE_UNAVAILABLE), - ServerAddress.fromUrl('localhost:7687'), - Logger.noOp() - ) - - connection._negotiateProtocol() - - const boltMagicPreamble = '60 60 b0 17' - const protocolVersion4x3 = '00 01 03 04' - const protocolVersion4x1 = '00 00 01 04' - const protocolVersion4x0 = '00 00 00 04' - const protocolVersion3 = '00 00 00 03' - expect(channel.toHex()).toBe( - `${boltMagicPreamble} ${protocolVersion4x3} ${protocolVersion4x1} ${protocolVersion4x0} ${protocolVersion3}` - ) + connection + .connect('mydriver/0.0.0', basicAuthToken()) + .then(() => { + connection + .protocol() + .run( + 'RETURN 1.0', + {}, + { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + mode: WRITE + } + ) + .subscribe(pullAllObserver) + }) + .catch(done.fail.bind(done)) }) - it('should provide error message when connecting to http-port', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}:7474`, { + it('should provide error message when connecting to http-port', async done => { + await createConnection(`bolt://${sharedNeo4j.hostname}:7474`, { encrypted: false }) + .then(done.fail.bind(done)) + .catch(error => { + expect(error).toBeDefined() + expect(error).not.toBeNull() - connection.connect('mydriver/0.0.0', basicAuthToken()).catch(error => { - expect(error).toBeDefined() - expect(error).not.toBeNull() - - if (testUtils.isServer()) { - // only node gets the pretty error message - expect(error.message).toBe( - 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + - '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' - ) - } - - done() - }) + if (testUtils.isServer()) { + // only node gets the pretty error message + expect(error.message).toBe( + 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' + ) + } + done() + }) }) it('should convert failure messages to errors', done => { const channel = new DummyChannel() - connection = new ChannelConnection( - channel, - new ConnectionErrorHandler(SERVICE_UNAVAILABLE), - ServerAddress.fromUrl('localhost:7687'), - Logger.noOp() - ) - - connection._negotiateProtocol() - const errorCode = 'Neo.ClientError.Schema.ConstraintValidationFailed' const errorMessage = 'Node 0 already exists with label User and property "email"=[john@doe.com]' - connection._queueObserver({ - onError: error => { - expectNeo4jError(error, errorCode, errorMessage) - done() - } - }) + createChannelConnection( + ServerAddress.fromUrl('localhost:7687'), + {}, + new ConnectionErrorHandler(SERVICE_UNAVAILABLE), + Logger.noOp(), + null, + () => channel + ) + .then(c => { + connection = c + connection._queueObserver({ + onCompleted: done.fail.bind(done), + onComplete: done.fail.bind(done), + onError: error => { + expectNeo4jError(error, errorCode, errorMessage) + done() + } + }) + channel.onmessage(packedFailureMessage(errorCode, errorMessage)) + }) + .catch(done.fail.bind(done)) channel.onmessage(packedHandshakeMessage()) - channel.onmessage(packedFailureMessage(errorCode, errorMessage)) }) - it('should notify when connection initialization completes', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + it('should notify when connection initialization completes', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) connection .connect('mydriver/0.0.0', basicAuthToken()) @@ -188,13 +177,14 @@ describe('#integration ChannelConnection', () => { expect(initializedConnection).toBe(connection) done() }) + .catch(done.fail.bind(done)) }) - it('should notify when connection initialization fails', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}:7474`) // wrong port + it('should notify when connection initialization fails', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) // wrong port connection - .connect('mydriver/0.0.0', basicAuthToken()) + .connect('mydriver/0.0.0', basicWrongAuthToken()) .then(() => done.fail('Should not initialize')) .catch(error => { expect(error).toBeDefined() @@ -202,9 +192,8 @@ describe('#integration ChannelConnection', () => { }) }) - it('should have server version after connection initialization completed', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - + it('should have server version after connection initialization completed', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) connection .connect('mydriver/0.0.0', basicAuthToken()) .then(initializedConnection => { @@ -213,13 +202,14 @@ describe('#integration ChannelConnection', () => { expect(serverVersion).toBeDefined() done() }) + .catch(done.fail.bind(done)) }) - it('should fail all new observers after failure to connect', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}:7474`) // wrong port + it('should fail all new observers after failure to connect', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) connection - .connect('mydriver/0.0.0', basicAuthToken()) + .connect('mydriver/0.0.0', basicWrongAuthToken()) .then(() => done.fail('Should not connect')) .catch(initialError => { expect(initialError).toBeDefined() @@ -275,94 +265,113 @@ describe('#integration ChannelConnection', () => { testQueueingOfObserversWithBrokenConnection(resetAction, done) }) - it('should reset and flush when SUCCESS received', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + it('should reset and flush when SUCCESS received', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) - connection.connect('my-driver/1.2.3', basicAuthToken()).then(() => { - connection - .resetAndFlush() - .then(() => { - expect(connection.isOpen()).toBeTruthy() - done() + connection + .connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection + .resetAndFlush() + .then(() => { + expect(connection.isOpen()).toBeTruthy() + done() + }) + .catch(error => done.fail(error)) + + // write a SUCCESS message for RESET before the actual response is received + connection.protocol()._responseHandler.handleResponse(SUCCESS_MESSAGE) + // enqueue a dummy observer to handle the real SUCCESS message + connection.protocol()._responseHandler._queueObserver({ + onCompleted: () => {} }) - .catch(error => done.fail(error)) - - // write a SUCCESS message for RESET before the actual response is received - connection._handleMessage(SUCCESS_MESSAGE) - // enqueue a dummy observer to handle the real SUCCESS message - connection._queueObserver({ - onCompleted: () => {} }) - }) + .catch(done.fail.bind(done)) }) - it('should fail to reset and flush when FAILURE received', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - - connection.connect('my-driver/1.2.3', basicAuthToken()).then(() => { - connection - .resetAndFlush() - .then(() => done.fail('Should fail')) - .catch(error => { - expect(error.message).toEqual( - 'Received FAILURE as a response for RESET: Neo4jError: Hello' - ) - expect(connection._isBroken).toBeTruthy() - expect(connection.isOpen()).toBeFalsy() - done() + it('should fail to reset and flush when FAILURE received', async done => { + createConnection(`bolt://${sharedNeo4j.hostname}`) + .then(connection => { + connection.connect('my-driver/1.2.3', basicAuthToken()).then(() => { + connection + .resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual( + 'Received FAILURE as a response for RESET: Neo4jError: Hello' + ) + expect(connection._isBroken).toBeTruthy() + expect(connection.isOpen()).toBeFalsy() + done() + }) + + // write a FAILURE message for RESET before the actual response is received / white box test + connection.protocol()._responseHandler.handleResponse(FAILURE_MESSAGE) + // enqueue a dummy observer to handle the real SUCCESS message + connection.protocol()._responseHandler._queueObserver({ + onCompleted: () => {} + }) }) - - // write a FAILURE message for RESET before the actual response is received - connection._handleMessage(FAILURE_MESSAGE) - // enqueue a dummy observer to handle the real SUCCESS message - connection._queueObserver({ - onCompleted: () => {} }) - }) + .catch(done.fail.bind(done)) }) - it('should fail to reset and flush when RECORD received', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + it('should fail to reset and flush when RECORD received', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) - connection.connect('my-driver/1.2.3', basicAuthToken()).then(() => { - connection - .resetAndFlush() - .then(() => done.fail('Should fail')) - .catch(error => { - expect(error.message).toEqual( - 'Received RECORD when resetting: received record is: {"value":"Hello"}' - ) - expect(connection._isBroken).toBeTruthy() - expect(connection.isOpen()).toBeFalsy() - done() - }) + connection + .connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection + .resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual( + 'Received RECORD when resetting: received record is: {"value":"Hello"}' + ) + expect(connection._isBroken).toBeTruthy() + expect(connection.isOpen()).toBeFalsy() + done() + }) - // write a RECORD message for RESET before the actual response is received - connection._handleMessage(RECORD_MESSAGE) - // enqueue a dummy observer to handle the real SUCCESS message - connection._queueObserver({ - onCompleted: () => {} + // write a RECORD message for RESET before the actual response is received + connection.protocol()._responseHandler.handleResponse(RECORD_MESSAGE) + // enqueue a dummy observer to handle the real SUCCESS message + connection.protocol()._responseHandler._queueObserver({ + onCompleted: () => {} + }) }) - }) + .catch(done.fail.bind(done)) }) - it('should acknowledge failure with RESET when SUCCESS received', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - - connection.connect('my-driver/1.2.3', basicAuthToken()).then(() => { - connection._currentFailure = newError('Hello') - connection._resetOnFailure() - - // write a SUCCESS message for RESET before the actual response is received - connection._handleMessage(SUCCESS_MESSAGE) - // enqueue a dummy observer to handle the real SUCCESS message - connection._queueObserver({ - onCompleted: () => {} + it('should acknowledge failure with RESET when SUCCESS received', async done => { + createConnection(`bolt://${sharedNeo4j.hostname}`) + .then(connection => { + connection + .connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection.protocol()._responseHandler._currentFailure = newError( + 'Hello' + ) // white box test, not ideal + connection._resetOnFailure() + + // write a SUCCESS message for RESET before the actual response is received + connection + .protocol() + ._responseHandler.handleResponse(SUCCESS_MESSAGE) + // enqueue a dummy observer to handle the real SUCCESS message + connection.protocol()._responseHandler._queueObserver({ + onCompleted: () => {} + }) + + expect( + connection.protocol()._responseHandler._currentFailure + ).toBeNull() + done() + }) + .catch(done.fail.bind(done)) }) - - expect(connection._currentFailure).toBeNull() - done() - }) + .catch(done.fail.bind(done)) }) it('should handle and transform fatal errors', done => { @@ -378,30 +387,32 @@ describe('#integration ChannelConnection', () => { } ) - connection = ChannelConnection.create( + createChannelConnection( ServerAddress.fromUrl(`bolt://${sharedNeo4j.hostname}`), {}, errorHandler, Logger.noOp() ) - - connection._queueObserver({ - onError: error => { - expect(error).toEqual(transformedError) - expect(errors.length).toEqual(1) - expect(errors[0].code).toEqual(SERVICE_UNAVAILABLE) - expect(addresses).toEqual([connection.address]) - done() - } - }) - - connection._handleFatalError(newError('Hello', SERVICE_UNAVAILABLE)) + .then(c => { + connection = c + connection._queueObserver({ + onError: error => { + expect(error).toEqual(transformedError) + expect(errors.length).toEqual(1) + expect(errors[0].code).toEqual(SERVICE_UNAVAILABLE) + expect(addresses).toEqual([connection.address]) + done() + } + }) + connection._handleFatalError(newError('Hello', SERVICE_UNAVAILABLE)) + }) + .catch(done.fail.bind(done)) }) it('should send INIT/HELLO and GOODBYE messages', async () => { const messages = [] - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - recordWrittenMessages(connection, messages) + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) + recordWrittenMessages(connection._protocol, messages) await connection.connect('mydriver/0.0.0', basicAuthToken()) @@ -418,7 +429,7 @@ describe('#integration ChannelConnection', () => { }) it('should not prepare broken connection to close', async () => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) await connection.connect('my-connection/9.9.9', basicAuthToken()) expect(connection._protocol).toBeDefined() @@ -470,21 +481,26 @@ describe('#integration ChannelConnection', () => { } } + function basicWrongAuthToken () { + return { + scheme: 'basic', + principal: sharedNeo4j.username + 'a', + credentials: sharedNeo4j.password + 'b' + } + } + async function testConnectionTimeout (encrypted) { const clock = jasmine.clock() clock.install() try { const boltUri = 'bolt://10.0.0.0' // use non-routable IP address which never responds - connection = createConnection( + setImmediate(() => clock.tick(1001)) + connection = await createConnection( boltUri, { encrypted: encrypted, connectionTimeout: 1000 }, 'TestErrorCode' ) - - clock.tick(1001) - - await connection.connect('mydriver/0.0.0', basicAuthToken()) } catch (error) { expect(error.code).toEqual('TestErrorCode') @@ -505,30 +521,34 @@ describe('#integration ChannelConnection', () => { } function testQueueingOfObserversWithBrokenConnection (connectionAction, done) { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - - connection._negotiateProtocol().then(() => { - connection._handleMessage(ILLEGAL_MESSAGE) - expect(connection.isOpen()).toBeFalsy() + createConnection(`bolt://${sharedNeo4j.hostname}`) + .then(connection => { + connection._handleProtocolError(ILLEGAL_MESSAGE) + expect(connection.isOpen()).toBeFalsy() - expect(connection._pendingObservers.length).toEqual(0) - connectionAction(connection) - expect(connection._pendingObservers.length).toEqual(0) + expect(connection.hasOngoingObservableRequests()).toBeFalsy() + connectionAction(connection) + expect(connection.hasOngoingObservableRequests()).toBeFalsy() - done() - }) + done() + }) + .catch(done.fail.bind(done)) } /** - * @return {Connection} + * @return {Promise} */ function createConnection (url, config, errorCode = null) { - return ChannelConnection.create( + const _config = config || {} + return createChannelConnection( ServerAddress.fromUrl(url), - config || {}, + _config, new ConnectionErrorHandler(errorCode || SERVICE_UNAVAILABLE), Logger.noOp() - ) + ).then(c => { + connection = c + return connection + }) } function recordWrittenMessages (connection, messages) { diff --git a/test/internal/connection-delegate.test.js b/test/internal/connection-delegate.test.js index 1dae77735..42bed041e 100644 --- a/test/internal/connection-delegate.test.js +++ b/test/internal/connection-delegate.test.js @@ -18,7 +18,7 @@ */ import DelegateConnection from '../../src/internal/connection-delegate' import Connection from '../../src/internal/connection' -import BoltProtocol from '../../src/internal/bolt-protocol-v1' +import { BoltProtocol } from '../../src/internal/bolt' import BoltAddress from '../../src/internal/server-address' import ConnectionErrorHandler from '../../src/internal/connection-error-handler' diff --git a/test/internal/protocol-handshaker.test.js b/test/internal/protocol-handshaker.test.js deleted file mode 100644 index 2d6e0f99c..000000000 --- a/test/internal/protocol-handshaker.test.js +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import ProtocolHandshaker from '../../src/internal/protocol-handshaker' -import Logger from '../../src/internal/logger' -import BoltProtocol from '../../src/internal/bolt-protocol-v1' -import BoltProtocolV4x3 from '../../src/internal/bolt-protocol-v4x3' - -import { alloc } from '../../src/internal/node' - -describe('#unit ProtocolHandshaker', () => { - it('should write handshake request', () => { - const writtenBuffers = [] - const fakeChannel = { - write: buffer => writtenBuffers.push(buffer) - } - - const handshaker = new ProtocolHandshaker( - null, - fakeChannel, - null, - false, - Logger.noOp() - ) - - handshaker.writeHandshakeRequest() - - expect(writtenBuffers.length).toEqual(1) - - const boltMagicPreamble = '60 60 b0 17' - const protocolVersion4x3 = '00 01 03 04' - const protocolVersion4x1 = '00 00 01 04' - const protocolVersion4x0 = '00 00 00 04' - const protocolVersion3 = '00 00 00 03' - - expect(writtenBuffers[0].toHex()).toEqual( - `${boltMagicPreamble} ${protocolVersion4x3} ${protocolVersion4x1} ${protocolVersion4x0} ${protocolVersion3}` - ) - }) - - it('should create protocol with valid version', () => { - const handshaker = new ProtocolHandshaker( - null, - null, - null, - false, - Logger.noOp() - ) - - // buffer with Bolt V1 - const buffer = handshakeResponse(1) - - const protocol = handshaker.createNegotiatedProtocol(buffer) - - expect(protocol).toBeDefined() - expect(protocol).not.toBeNull() - expect(protocol instanceof BoltProtocol).toBeTruthy() - }) - - it('should create protocol 4.3', () => { - const handshaker = new ProtocolHandshaker( - null, - null, - null, - false, - Logger.noOp() - ) - - // buffer with Bolt V4.3 - const buffer = handshakeResponse(4, 3) - - const protocol = handshaker.createNegotiatedProtocol(buffer) - - expect(protocol).toBeDefined() - expect(protocol).not.toBeNull() - expect(protocol.version).toEqual(4.3) - expect(protocol instanceof BoltProtocolV4x3).toBeTruthy() - }) - - it('should fail to create protocol from invalid version', () => { - const handshaker = new ProtocolHandshaker( - null, - null, - null, - false, - Logger.noOp() - ) - - // buffer with Bolt V42 which is invalid - const buffer = handshakeResponse(42) - - expect(() => handshaker.createNegotiatedProtocol(buffer)).toThrow() - }) - - it('should fail to create protocol from HTTP as invalid version', () => { - const handshaker = new ProtocolHandshaker( - null, - null, - null, - false, - Logger.noOp() - ) - - // buffer with HTTP magic int - const buffer = handshakeResponse(1213486160) - - expect(() => handshaker.createNegotiatedProtocol(buffer)).toThrow() - }) -}) - -/** - * @param {number} version - * @return {BaseBuffer} - */ -function handshakeResponse (version, minor = 0) { - const buffer = alloc(4) - buffer.writeInt32((minor << 8) | version) - buffer.reset() - return buffer -} diff --git a/test/internal/rediscovery.test.js b/test/internal/rediscovery.test.js index 6b3fbfa06..b78c98aad 100644 --- a/test/internal/rediscovery.test.js +++ b/test/internal/rediscovery.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import RawRoutingTable from '../../src/internal/routing-table-raw' +import { RawRoutingTable } from '../../src/internal/bolt' import Rediscovery from '../../src/internal/rediscovery' import RoutingTable from '../../src/internal/routing-table' import ServerAddress from '../../src/internal/server-address' diff --git a/test/internal/routing-table.test.js b/test/internal/routing-table.test.js index 25803eb59..2cd6cca49 100644 --- a/test/internal/routing-table.test.js +++ b/test/internal/routing-table.test.js @@ -20,7 +20,7 @@ import RoutingTable from '../../src/internal/routing-table' import Integer, { int } from '../../src/integer' import { READ, WRITE } from '../../src/driver' import ServerAddress from '../../src/internal/server-address' -import RawRoutingTable from '../../src/internal/routing-table-raw' +import { RawRoutingTable } from '../../src/internal/bolt' import { PROTOCOL_ERROR } from '../../src/error' import lolex from 'lolex' diff --git a/test/internal/test-utils.js b/test/internal/test-utils.js index 542b48d70..a79b326bc 100644 --- a/test/internal/test-utils.js +++ b/test/internal/test-utils.js @@ -16,6 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import { Chunker } from '../../src/internal/chunking' import Connection from '../../src/internal/connection' function isClient () { @@ -115,10 +116,33 @@ class MessageRecordingConnection extends Connection { } } +function spyProtocolWrite (protocol, callRealMethod = false) { + protocol.messages = [] + protocol.observers = [] + protocol.flushes = [] + + const write = callRealMethod ? protocol.write.bind(protocol) : () => true + protocol.write = (message, observer, flush) => { + protocol.messages.push(message) + protocol.observers.push(observer) + protocol.flushes.push(flush) + return write(message, observer, flush) + } + + protocol.verifyMessageCount = expected => { + expect(protocol.messages.length).toEqual(expected) + expect(protocol.observers.length).toEqual(expected) + expect(protocol.flushes.length).toEqual(expected) + } + + return protocol +} + export default { isClient, isServer, fakeStandardDateWithOffset, matchers, - MessageRecordingConnection + MessageRecordingConnection, + spyProtocolWrite } diff --git a/test/session.test.js b/test/session.test.js index 52b2d2438..dc2f018a6 100644 --- a/test/session.test.js +++ b/test/session.test.js @@ -392,7 +392,7 @@ describe('#integration session', () => { // wait some time than close the session with a long running query setTimeout(() => { - session.close() + session.close().catch(done.fail.bind(done)) }, 1000) }, 70000)