diff --git a/index.js b/index.js index c283685..8ad2bdc 100644 --- a/index.js +++ b/index.js @@ -1,4 +1,3 @@ -var async = require('async'); var mongodb = require('./mongodb'); var DB = require('sharedb').DB; var OpLinkValidator = require('./op-link-validator'); @@ -55,15 +54,21 @@ function ShareDbMongo(mongo, options) { // Track whether the close method has been called this.closed = false; + this.mongo = null; + this._mongoClient = null; + this.mongoPoll = null; + this._mongoPollClient = null; + if (typeof mongo === 'string' || typeof mongo === 'function') { - // We can only get the mongodb client instance in a callback, so - // buffer up any requests received in the meantime - this.mongo = null; - this._mongoClient = null; - this.mongoPoll = null; - this._mongoPollClient = null; - this.pendingConnect = []; - this._connect(mongo, options); + var self = this; + this._connection = this._connect(mongo, options) + .then(function(result) { + self.mongo = result.mongo; + self._mongoClient = result.mongoClient; + self.mongoPoll = result.mongoPoll; + self._mongoPollClient = result.mongoPollClient; + return result; + }); } else { throw new Error('deprecated: pass mongo as url string or function with callback'); } @@ -115,19 +120,10 @@ ShareDbMongo.prototype.getDbs = function(callback) { var err = ShareDbMongo.alreadyClosedError(); return callback(err); } - // We consider ouself ready to reply if this.mongo is defined and don't check - // this.mongoPoll, since it is optional and is null by default. Thus, it's - // important that these two properties are only set together synchronously - if (this.mongo) return callback(null, this.mongo, this.mongoPoll); - this.pendingConnect.push(callback); -}; - -ShareDbMongo.prototype._flushPendingConnect = function() { - var pendingConnect = this.pendingConnect; - this.pendingConnect = null; - for (var i = 0; i < pendingConnect.length; i++) { - pendingConnect[i](null, this.mongo, this.mongoPoll); - } + this._connection + .then(function(result) { + callback(null, result.mongo, result.mongoPoll); + }, callback); }; function isLegacyMongoClient(client) { @@ -145,72 +141,55 @@ ShareDbMongo.prototype._connect = function(mongo, options) { // // Throw errors in this function if we fail to connect, since we aren't // implementing a way to retry - var self = this; + var connections = [connect(mongo, options.mongoOptions)]; var mongoPoll = options.mongoPoll; - if (mongoPoll) { - var tasks = { - mongoClient: connect(mongo, options.mongoOptions), - mongoPollClient: connect(mongoPoll, options.mongoPollOptions) + if (mongoPoll) connections.push(connect(mongoPoll, options.mongoPollOptions)); + + return Promise.all(connections).then(function(clients) { + var mongoClient = clients[0]; + var mongoPollClient = clients[1]; + var result = { + mongo: mongoClient, + mongoClient: mongoClient, + mongoPoll: mongoPollClient, + mongoPollClient: mongoPollClient }; - async.parallel(tasks, function(err, results) { - if (err) throw err; - var mongoClient = results.mongoClient; - var mongoPollClient = results.mongoPollClient; - if (isLegacyMongoClient(mongoClient)) { - self.mongo = self._mongoClient = mongoClient; - self.mongoPoll = self._mongoPollClient = mongoPollClient; - } else { - self.mongo = mongoClient.db(); - self._mongoClient = mongoClient; - self.mongoPoll = mongoPollClient.db(); - self._mongoPollClient = mongoPollClient; - } - self._flushPendingConnect(); - }); - return; - } - var finish = function(err, client) { - if (err) throw err; - if (isLegacyMongoClient(client)) { - self.mongo = self._mongoClient = client; - } else { - self.mongo = client.db(); - self._mongoClient = client; + if (!isLegacyMongoClient(mongoClient)) { + result.mongo = mongoClient.db(); + if (mongoPollClient) result.mongoPoll = mongoPollClient.db(); } - self._flushPendingConnect(); - }; - if (typeof mongo === 'function') { - mongo(finish); - return; - } - // TODO: Don't pass options directly to mongodb.connect(); - // only pass options.mongoOptions - var mongoOptions = options.mongoOptions || options; - connect(mongo, mongoOptions)(finish); + return result; + }); }; function connect(mongo, options) { - if (typeof mongo === 'function') return mongo; - return function(callback) { - options = Object.assign({}, options); - delete options.mongo; - delete options.mongoPoll; - delete options.mongoPollOptions; - delete options.pollDelay; - delete options.disableIndexCreation; - delete options.allowAllQueries; - delete options.allowJSQueries; - delete options.allowAllQueries; - delete options.allowAggregateQueries; - delete options.getOpsWithoutStrictLinking; - - if (typeof mongodb.connect === 'function') { - mongodb.connect(mongo, options, callback); - } else { - var client = new mongodb.MongoClient(mongo, options); - client.connect(callback); - } - }; + if (typeof mongo === 'function') { + return new Promise(function(resolve, reject) { + mongo(function(error, client) { + if (error) return reject(error); + resolve(client); + }); + }); + } + + options = Object.assign({}, options); + delete options.mongo; + delete options.mongoPoll; + delete options.mongoPollOptions; + delete options.pollDelay; + delete options.disableIndexCreation; + delete options.allowAllQueries; + delete options.allowJSQueries; + delete options.allowAllQueries; + delete options.allowAggregateQueries; + delete options.getOpsWithoutStrictLinking; + + if (typeof mongodb.connect === 'function') { + return mongodb.connect(mongo, options); + } else { + var client = new mongodb.MongoClient(mongo, options); + return client.connect(); + } } ShareDbMongo.prototype.close = function(callback) { @@ -225,11 +204,13 @@ ShareDbMongo.prototype.close = function(callback) { if (err && err.code === 5101) return callback(); if (err) return callback(err); self.closed = true; - self._mongoClient.close(function(err) { - if (err) return callback(err); - if (!self._mongoPollClient) return callback(); - self._mongoPollClient.close(callback); - }); + self._mongoClient.close() + .then(function() { + return self._mongoPollClient && self._mongoPollClient.close(); + }) + .then(function() { + callback(null); + }, callback); }); }; @@ -280,14 +261,20 @@ ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, cal var doc = shallowClone(op); doc.d = id; doc.o = snapshot._opLink; - opCollection.insertOne(doc, callback); + opCollection.insertOne(doc) + .then(function(result) { + callback(null, result); + }, callback); }); }; ShareDbMongo.prototype._deleteOp = function(collectionName, opId, callback) { this.getOpCollection(collectionName, function(err, opCollection) { if (err) return callback(err); - opCollection.deleteOne({_id: opId}, callback); + opCollection.deleteOne({_id: opId}) + .then(function(result) { + callback(null, result); + }, callback); }); }; @@ -301,17 +288,20 @@ ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, ca if (middlewareErr) { return callback(middlewareErr); } - collection.insertOne(request.documentToWrite, function(err) { - if (err) { - // Return non-success instead of duplicate key error, since this is - // expected to occur during simultaneous creates on the same id - if (err.code === 11000 && /\b_id_\b/.test(err.message)) { - return callback(null, false); + collection.insertOne(request.documentToWrite) + .then( + function() { + callback(null, true); + }, + function(err) { + // Return non-success instead of duplicate key error, since this is + // expected to occur during simultaneous creates on the same id + if (err.code === 11000 && /\b_id_\b/.test(err.message)) { + return callback(null, false); + } + return callback(err); } - return callback(err); - } - callback(null, true); - }); + ); }); } else { request.query = {_id: id, _v: request.documentToWrite._v - 1}; @@ -319,11 +309,11 @@ ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, ca if (middlewareErr) { return callback(middlewareErr); } - collection.replaceOne(request.query, request.documentToWrite, function(err, result) { - if (err) return callback(err); - var succeeded = !!result.modifiedCount; - callback(null, succeeded); - }); + collection.replaceOne(request.query, request.documentToWrite) + .then(function(result) { + var succeeded = !!result.modifiedCount; + callback(null, succeeded); + }, callback); }); } }); @@ -343,11 +333,11 @@ ShareDbMongo.prototype.getSnapshot = function(collectionName, id, fields, option self._middleware.trigger(MiddlewareHandler.Actions.beforeSnapshotLookup, request, function(middlewareErr) { if (middlewareErr) return callback(middlewareErr); - collection.find(request.query, request.findOptions).limit(1).project(projection).next(function(err, doc) { - if (err) return callback(err); - var snapshot = (doc) ? castToSnapshot(doc) : new MongoSnapshot(id, 0, null, undefined); - callback(null, snapshot); - }); + collection.find(request.query, request.findOptions).limit(1).project(projection).next() + .then(function(doc) { + var snapshot = (doc) ? castToSnapshot(doc) : new MongoSnapshot(id, 0, null, undefined); + callback(null, snapshot); + }, callback); }); }); }; @@ -363,20 +353,20 @@ ShareDbMongo.prototype.getSnapshotBulk = function(collectionName, ids, fields, o self._middleware.trigger(MiddlewareHandler.Actions.beforeSnapshotLookup, request, function(middlewareErr) { if (middlewareErr) return callback(middlewareErr); - collection.find(request.query, request.findOptions).project(projection).toArray(function(err, docs) { - if (err) return callback(err); - var snapshotMap = {}; - for (var i = 0; i < docs.length; i++) { - var snapshot = castToSnapshot(docs[i]); - snapshotMap[snapshot.id] = snapshot; - } - for (var i = 0; i < ids.length; i++) { - var id = ids[i]; - if (snapshotMap[id]) continue; - snapshotMap[id] = new MongoSnapshot(id, 0, null, undefined); - } - callback(null, snapshotMap); - }); + collection.find(request.query, request.findOptions).project(projection).toArray() + .then(function(docs) { + var snapshotMap = {}; + for (var i = 0; i < docs.length; i++) { + var snapshot = castToSnapshot(docs[i]); + snapshotMap[snapshot.id] = snapshot; + } + for (var i = 0; i < ids.length; i++) { + var id = ids[i]; + if (snapshotMap[id]) continue; + snapshotMap[id] = new MongoSnapshot(id, 0, null, undefined); + } + callback(null, snapshotMap); + }, callback); }); }); }; @@ -423,14 +413,14 @@ ShareDbMongo.prototype.getOpCollection = function(collectionName, callback) { // collection this won't be a problem, but this is a dangerous mechanism. // Perhaps we should only warn instead of creating the indexes, especially // when there is a lot of data in the collection. - collection.createIndex({d: 1, v: 1}, {background: true}, function(err) { - if (err) return callback(err); - collection.createIndex({src: 1, seq: 1, v: 1}, {background: true}, function(err) { - if (err) return callback(err); + collection.createIndex({d: 1, v: 1}, {background: true}) + .then(function() { + return collection.createIndex({src: 1, seq: 1, v: 1}, {background: true}); + }) + .then(function() { self.opIndexes[collectionName] = true; callback(null, collection); - }); - }); + }, callback); }); }; @@ -544,27 +534,27 @@ ShareDbMongo.prototype.getCommittedOpVersion = function(collectionName, id, snap // Since ops are optimistically written prior to writing the snapshot, the // op could end up being written multiple times or have been written but // not count as committed if not backreferenced from the snapshot - opCollection.find(query).project(projection).sort(sort).limit(1).next(function(err, doc) { - if (err) return callback(err); - // If we find no op with the same src and seq, we definitely don't have - // any match. This should prevent us from accidentally querying a huge - // history of ops - if (!doc) return callback(); - // If we do find an op with the same src and seq, we still have to get - // the ops from the snapshot to figure out if the op was actually - // committed already, and at what version in case of multiple matches - var from = doc.v; - self.getOpsToSnapshot(collectionName, id, from, snapshot, options, function(err, ops) { - if (err) return callback(err); - for (var i = ops.length; i--;) { - var item = ops[i]; - if (op.src === item.src && op.seq === item.seq) { - return callback(null, item.v); + opCollection.find(query).project(projection).sort(sort).limit(1).next() + .then(function(doc) { + // If we find no op with the same src and seq, we definitely don't have + // any match. This should prevent us from accidentally querying a huge + // history of ops + if (!doc) return callback(); + // If we do find an op with the same src and seq, we still have to get + // the ops from the snapshot to figure out if the op was actually + // committed already, and at what version in case of multiple matches + var from = doc.v; + self.getOpsToSnapshot(collectionName, id, from, snapshot, options, function(err, ops) { + if (err) return callback(err); + for (var i = ops.length; i--;) { + var item = ops[i]; + if (op.src === item.src && op.seq === item.seq) { + return callback(null, item.v); + } } - } - callback(); - }); - }); + callback(); + }); + }, callback); }); }; @@ -677,7 +667,10 @@ ShareDbMongo.prototype._getOps = function(collectionName, id, from, to, options, // for tracking purposes var projection = (options && options.metadata) ? {d: 0} : {d: 0, m: 0}; var sort = {v: 1}; - opCollection.find(query).project(projection).sort(sort).toArray(callback); + opCollection.find(query).project(projection).sort(sort).toArray() + .then(function(result) { + callback(null, result); + }, callback); }); }; @@ -776,21 +769,23 @@ function getFirstOpWithUniqueVersion(cursor, opLinkValidator, callback) { return closeCursor(cursor, callback, error, opWithUniqueVersion); } - cursor.next(function(error, op) { - if (error) { - return closeCursor(cursor, callback, error); - } - - opLinkValidator.push(op); - getFirstOpWithUniqueVersion(cursor, opLinkValidator, callback); - }); + cursor.next() + .then( + function(op) { + opLinkValidator.push(op); + getFirstOpWithUniqueVersion(cursor, opLinkValidator, callback); + }, + function(error) { + closeCursor(cursor, callback, error); + } + ); } function closeCursor(cursor, callback, error, returnValue) { - cursor.close(function(closeError) { - error = error || closeError; - callback(error, returnValue); - }); + cursor.close() + .then(function() { + callback(error, returnValue); + }, callback); } ShareDbMongo.prototype._getSnapshotOpLink = function(collectionName, id, options, callback) { @@ -804,7 +799,10 @@ ShareDbMongo.prototype._getSnapshotOpLink = function(collectionName, id, options request.query = query; self._middleware.trigger(MiddlewareHandler.Actions.beforeSnapshotLookup, request, function(middlewareErr) { if (middlewareErr) return callback(middlewareErr); - collection.find(query, request.findOptions).limit(1).project(projection).next(callback); + collection.find(query, request.findOptions).limit(1).project(projection).next() + .then(function(result) { + callback(null, result); + }, callback); }); }); }; @@ -820,7 +818,10 @@ ShareDbMongo.prototype._getSnapshotOpLinkBulk = function(collectionName, ids, op request.query = query; self._middleware.trigger(MiddlewareHandler.Actions.beforeSnapshotLookup, request, function(middlewareErr) { if (middlewareErr) return callback(middlewareErr); - collection.find(query, request.findOptions).project(projection).toArray(callback); + collection.find(query, request.findOptions).project(projection).toArray() + .then(function(result) { + callback(null, result); + }, callback); }); }); }; @@ -882,7 +883,10 @@ ShareDbMongo.prototype._query = function(collection, inputQuery, projection, cal // If no collection operation or cursor operations were used, return // an array of snapshots that are passed in the "results" argument // in the callback - cursor.toArray(callback); + cursor.toArray() + .then(function(result) { + callback(null, result); + }, callback); }; ShareDbMongo.prototype.query = function(collectionName, inputQuery, fields, options, callback) { @@ -955,9 +959,10 @@ ShareDbMongo.prototype.queryPollDoc = function(collectionName, id, inputQuery, o parsed.query._id = id; } - collection.find(parsed.query).limit(1).project({_id: 1}).next(function(err, doc) { - callback(err, !!doc); - }); + collection.find(parsed.query).limit(1).project({_id: 1}).next() + .then(function(doc) { + callback(null, !!doc); + }, callback); }); }; @@ -1482,11 +1487,17 @@ function getProjection(fields, options) { var collectionOperationsMap = { $distinct: function(collection, query, value, cb) { - collection.distinct(value.field, query, cb); + collection.distinct(value.field, query) + .then(function(result) { + cb(null, result); + }, cb); }, $aggregate: function(collection, query, value, cb) { var cursor = collection.aggregate(value); - cursor.toArray(cb); + cursor.toArray() + .then(function(result) { + cb(null, result); + }, cb); }, $mapReduce: function(collection, query, value, cb) { if (typeof value !== 'object') { @@ -1498,20 +1509,31 @@ var collectionOperationsMap = { out: {inline: 1}, scope: value.scope || {} }; - collection.mapReduce( - value.map, value.reduce, mapReduceOptions, cb); + collection.mapReduce(value.map, value.reduce, mapReduceOptions) + .then(function(result) { + cb(null, result); + }, cb); } }; var cursorOperationsMap = { $count: function(cursor, value, cb) { - cursor.count(cb); + cursor.count() + .then(function(result) { + cb(null, result); + }, cb); }, $explain: function(cursor, verbosity, cb) { - cursor.explain(verbosity, cb); + cursor.explain(verbosity) + .then(function(result) { + cb(null, result); + }, cb); }, $map: function(cursor, fn, cb) { - cursor.map(fn, cb); + cursor.map(fn) + .then(function(result) { + cb(null, result); + }, cb); } }; diff --git a/package.json b/package.json index cd4a40d..03f9ad3 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,6 @@ "description": "MongoDB database adapter for ShareDB", "main": "index.js", "dependencies": { - "async": "^2.6.3", "mongodb": "^2.1.2 || ^3.1.13 || ^4.0.0", "sharedb": "^1.9.1 || ^2.0.0 || ^3.0.0" }, diff --git a/test/test_get_ops.js b/test/test_get_ops.js index 7510a9d..3eeb34e 100644 --- a/test/test_get_ops.js +++ b/test/test_get_ops.js @@ -12,10 +12,11 @@ function create(options, callback) { var db = new ShareDbMongo(mongoUrl, opts); db.getDbs(function(err, mongo) { if (err) return callback(err); - mongo.dropDatabase(function(err) { - if (err) return callback(err); - callback(null, db, mongo); - }); + mongo.dropDatabase() + .then(function() { + callback(null, db, mongo); + }) + .catch(callback); }); }; @@ -62,7 +63,9 @@ function create(options, callback) { commitOpChain(db, mongo, collection, id, ops, function(error) { if (error) done(error); - mongo.collection('o_' + collection).deleteOne({v: 1}, done); + mongo.collection('o_' + collection).deleteOne({v: 1}).then(function() { + done(); + }); }); }); @@ -127,8 +130,7 @@ function commitOpChain(db, mongo, collection, id, ops, previousOpId, version, ca var snapshot = {id: id, v: version + 1, type: 'json0', data: {}, m: null, _opLink: previousOpId}; db.commit(collection, id, op, snapshot, null, function(error) { if (error) return callback(error); - mongo.collection('o_' + collection).find({d: id, v: version}).next(function(error, op) { - if (error) return callback(error); + mongo.collection('o_' + collection).find({d: id, v: version}).next().then(function(op) { commitOpChain(db, mongo, collection, id, ops, (op ? op._id : null), ++version, callback); }); }); diff --git a/test/test_get_ops_without_strict_linking.js b/test/test_get_ops_without_strict_linking.js index 9616b9c..4dfb701 100644 --- a/test/test_get_ops_without_strict_linking.js +++ b/test/test_get_ops_without_strict_linking.js @@ -12,10 +12,11 @@ function create(callback) { }); db.getDbs(function(err, mongo) { if (err) return callback(err); - mongo.dropDatabase(function(err) { - if (err) return callback(err); - callback(null, db, mongo); - }); + mongo.dropDatabase() + .then(function() { + callback(null, db, mongo); + }) + .catch(callback); }); }; @@ -78,7 +79,11 @@ describe('getOpsWithoutStrictLinking: true', function() { callInSeries([ function(next) { - mongo.collection('o_' + collection).insertOne(spuriousOp, next); + mongo.collection('o_' + collection).insertOne(spuriousOp) + .then(function(result) { + next(null, result); + }) + .catch(next); }, function(result, next) { db.getOps(collection, id, 0, 2, null, next); @@ -99,7 +104,11 @@ describe('getOpsWithoutStrictLinking: true', function() { callInSeries([ function(next) { - mongo.collection('o_' + collection).insertOne(spuriousOp, next); + mongo.collection('o_' + collection).insertOne(spuriousOp) + .then(function(result) { + next(null, result); + }) + .catch(next); }, function(result, next) { db.getOps(collection, id, 0, 2, null, next); @@ -125,7 +134,11 @@ describe('getOpsWithoutStrictLinking: true', function() { callInSeries([ function(next) { - mongo.collection('o_' + collection).insertMany(spuriousOps, next); + mongo.collection('o_' + collection).insertMany(spuriousOps) + .then(function(result) { + next(null, result); + }) + .catch(next); }, function(result, next) { db.getOps(collection, id, 0, 2, null, next); @@ -160,10 +173,11 @@ function commitOpChain(db, mongo, collection, id, ops, previousOpId, version, ca var snapshot = {id: id, v: version + 1, type: 'json0', data: {}, m: null, _opLink: previousOpId}; db.commit(collection, id, op, snapshot, null, function(error) { if (error) return callback(error); - mongo.collection('o_' + collection).find({d: id, v: version}).next(function(error, op) { - if (error) return callback(error); - commitOpChain(db, mongo, collection, id, ops, op._id, ++version, callback); - }); + mongo.collection('o_' + collection).find({d: id, v: version}).next() + .then(function(op) { + commitOpChain(db, mongo, collection, id, ops, op._id, ++version, callback); + }) + .catch(callback); }); } diff --git a/test/test_mongo.js b/test/test_mongo.js index b3abf97..c288633 100644 --- a/test/test_mongo.js +++ b/test/test_mongo.js @@ -9,10 +9,11 @@ function create(callback) { var db = new ShareDbMongo(mongoUrl); db.getDbs(function(err, mongo) { if (err) return callback(err); - mongo.dropDatabase(function(err) { - if (err) return callback(err); - callback(null, db, mongo); - }); + mongo.dropDatabase() + .then(function() { + callback(null, db, mongo); + }) + .catch(callback); }); }; @@ -38,8 +39,7 @@ describe('mongo db', function() { var mongo = this.mongo; this.db.commit('testcollection', 'foo', {v: 0, create: {}}, {}, null, function(err) { if (err) return done(err); - mongo.collection('o_testcollection').indexInformation(function(err, indexes) { - if (err) return done(err); + mongo.collection('o_testcollection').indexInformation().then(function(indexes) { // Index for getting document(s) ops expect(indexes['d_1_v_1']).ok; // Index for checking committed op(s) by src and seq @@ -51,8 +51,7 @@ describe('mongo db', function() { it('respects unique indexes', function(done) { var db = this.db; - this.mongo.collection('testcollection').createIndex({x: 1}, {unique: true}, function(err) { - if (err) return done(err); + this.mongo.collection('testcollection').createIndex({x: 1}, {unique: true}).then(function() { db.commit('testcollection', 'foo', {v: 0, create: {}}, {v: 1, data: {x: 7}}, null, function(err) { if (err) return done(err); db.commit('testcollection', 'bar', {v: 0, create: {}}, {v: 1, data: {x: 7}}, null, function(err) { @@ -381,10 +380,11 @@ describe('mongo db connection', function() { // logic. this.db.getDbs(function(err, mongo) { if (err) return done(err); - mongo.dropDatabase(function(err) { - if (err) return done(err); - done(); - }); + mongo.dropDatabase() + .then(function() { + done(); + }) + .catch(done); }); }); diff --git a/test/test_mongo_middleware.js b/test/test_mongo_middleware.js index 5124262..c4716aa 100644 --- a/test/test_mongo_middleware.js +++ b/test/test_mongo_middleware.js @@ -16,10 +16,11 @@ function create(callback) { var db = new ShareDbMongo(mongoUrl); db.getDbs(function(err, mongo) { if (err) return callback(err); - mongo.dropDatabase(function(err) { - if (err) return callback(err); - callback(null, db, mongo); - }); + mongo.dropDatabase() + .then(function() { + callback(null, db, mongo); + }) + .catch(callback); }); }