First version with PostgreSQL
[oweals/peertube.git] / server / models / request.js
index c2cfe83cedcc0a3607b56961665a8159ce8f3581..882f747b76f76eaaa4e984f3f86a87c05f733c26 100644 (file)
@@ -2,66 +2,58 @@
 
 const each = require('async/each')
 const eachLimit = require('async/eachLimit')
-const values = require('lodash/values')
-const mongoose = require('mongoose')
 const waterfall = require('async/waterfall')
 
 const constants = require('../initializers/constants')
 const logger = require('../helpers/logger')
 const requests = require('../helpers/requests')
 
-const Pod = mongoose.model('Pod')
-
 let timer = null
 let lastRequestTimestamp = 0
 
 // ---------------------------------------------------------------------------
 
-const RequestSchema = mongoose.Schema({
-  request: mongoose.Schema.Types.Mixed,
-  endpoint: {
-    type: String,
-    enum: [ values(constants.REQUEST_ENDPOINTS) ]
-  },
-  to: [
+module.exports = function (sequelize, DataTypes) {
+  const Request = sequelize.define('Request',
+    {
+      request: {
+        type: DataTypes.JSON
+      },
+      endpoint: {
+        // TODO: enum?
+        type: DataTypes.STRING
+      }
+    },
     {
-      type: mongoose.Schema.Types.ObjectId,
-      ref: 'Pod'
+      classMethods: {
+        associate,
+
+        activate,
+        countTotalRequests,
+        deactivate,
+        flush,
+        forceSend,
+        remainingMilliSeconds
+      }
     }
-  ]
-})
-
-RequestSchema.statics = {
-  activate,
-  deactivate,
-  flush,
-  forceSend,
-  list,
-  remainingMilliSeconds
-}
-
-RequestSchema.pre('save', function (next) {
-  const self = this
-
-  if (self.to.length === 0) {
-    Pod.listAllIds(function (err, podIds) {
-      if (err) return next(err)
-
-      // No friends
-      if (podIds.length === 0) return
-
-      self.to = podIds
-      return next()
-    })
-  } else {
-    return next()
-  }
-})
+  )
 
-mongoose.model('Request', RequestSchema)
+  return Request
+}
 
 // ------------------------------ STATICS ------------------------------
 
+function associate (models) {
+  this.belongsToMany(models.Pod, {
+    foreignKey: {
+      name: 'requestId',
+      allowNull: false
+    },
+    through: models.RequestToPod,
+    onDelete: 'CASCADE'
+  })
+}
+
 function activate () {
   logger.info('Requests scheduler activated.')
   lastRequestTimestamp = Date.now()
@@ -73,6 +65,14 @@ function activate () {
   }, constants.REQUESTS_INTERVAL)
 }
 
+function countTotalRequests (callback) {
+  const query = {
+    include: [ this.sequelize.models.Pod ]
+  }
+
+  return this.count(query).asCallback(callback)
+}
+
 function deactivate () {
   logger.info('Requests scheduler deactivated.')
   clearInterval(timer)
@@ -90,10 +90,6 @@ function forceSend () {
   makeRequests.call(this)
 }
 
-function list (callback) {
-  this.find({ }, callback)
-}
-
 function remainingMilliSeconds () {
   if (timer === null) return -1
 
@@ -136,6 +132,7 @@ function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
 // Make all the requests of the scheduler
 function makeRequests () {
   const self = this
+  const RequestToPod = this.sequelize.models.RequestToPod
 
   // We limit the size of the requests (REQUESTS_LIMIT)
   // We don't want to stuck with the same failing requests so we get a random list
@@ -156,20 +153,20 @@ function makeRequests () {
     // We want to group requests by destinations pod and endpoint
     const requestsToMakeGrouped = {}
 
-    requests.forEach(function (poolRequest) {
-      poolRequest.to.forEach(function (toPodId) {
-        const hashKey = toPodId + poolRequest.endpoint
+    requests.forEach(function (request) {
+      request.Pods.forEach(function (toPod) {
+        const hashKey = toPod.id + request.endpoint
         if (!requestsToMakeGrouped[hashKey]) {
           requestsToMakeGrouped[hashKey] = {
-            toPodId,
-            endpoint: poolRequest.endpoint,
-            ids: [], // pool request ids, to delete them from the DB in the future
+            toPodId: toPod.id,
+            endpoint: request.endpoint,
+            ids: [], // request ids, to delete them from the DB in the future
             datas: [] // requests data,
           }
         }
 
-        requestsToMakeGrouped[hashKey].ids.push(poolRequest._id)
-        requestsToMakeGrouped[hashKey].datas.push(poolRequest.request)
+        requestsToMakeGrouped[hashKey].ids.push(request.id)
+        requestsToMakeGrouped[hashKey].datas.push(request.request)
       })
     })
 
@@ -179,8 +176,8 @@ function makeRequests () {
     eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
       const requestToMake = requestsToMakeGrouped[hashKey]
 
-      // FIXME: mongodb request inside a loop :/
-      Pod.load(requestToMake.toPodId, function (err, toPod) {
+      // FIXME: SQL request inside a loop :/
+      self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) {
         if (err) {
           logger.error('Error finding pod by id.', { err: err })
           return callbackEach()
@@ -191,7 +188,7 @@ function makeRequests () {
           const requestIdsToDelete = requestToMake.ids
 
           logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId)
-          removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
+          RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
           return callbackEach()
         }
 
@@ -202,7 +199,7 @@ function makeRequests () {
             goodPods.push(requestToMake.toPodId)
 
             // Remove the pod id of these request ids
-            removePodOf.call(self, requestToMake.ids, requestToMake.toPodId, callbackEach)
+            RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach)
           } else {
             badPods.push(requestToMake.toPodId)
             callbackEach()
@@ -211,18 +208,22 @@ function makeRequests () {
       })
     }, function () {
       // All the requests were made, we update the pods score
-      updatePodsScore(goodPods, badPods)
+      updatePodsScore.call(self, goodPods, badPods)
       // Flush requests with no pod
-      removeWithEmptyTo.call(self)
+      removeWithEmptyTo.call(self, function (err) {
+        if (err) logger.error('Error when removing requests with no pods.', { error: err })
+      })
     })
   })
 }
 
 // Remove pods with a score of 0 (too many requests where they were unreachable)
 function removeBadPods () {
+  const self = this
+
   waterfall([
     function findBadPods (callback) {
-      Pod.listBadPods(function (err, pods) {
+      self.sequelize.models.Pod.listBadPods(function (err, pods) {
         if (err) {
           logger.error('Cannot find bad pods.', { error: err })
           return callback(err)
@@ -233,10 +234,8 @@ function removeBadPods () {
     },
 
     function removeTheseBadPods (pods, callback) {
-      if (pods.length === 0) return callback(null, 0)
-
       each(pods, function (pod, callbackEach) {
-        pod.remove(callbackEach)
+        pod.destroy().asCallback(callbackEach)
       }, function (err) {
         return callback(err, pods.length)
       })
@@ -253,43 +252,67 @@ function removeBadPods () {
 }
 
 function updatePodsScore (goodPods, badPods) {
+  const self = this
+  const Pod = this.sequelize.models.Pod
+
   logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
 
-  Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
-    if (err) logger.error('Cannot increment scores of good pods.')
-  })
+  if (goodPods.length !== 0) {
+    Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
+      if (err) logger.error('Cannot increment scores of good pods.')
+    })
+  }
 
-  Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
-    if (err) logger.error('Cannot decrement scores of bad pods.')
-    removeBadPods()
-  })
+  if (badPods.length !== 0) {
+    Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
+      if (err) logger.error('Cannot decrement scores of bad pods.')
+      removeBadPods.call(self)
+    })
+  }
 }
 
 function listWithLimitAndRandom (limit, callback) {
   const self = this
 
-  self.count(function (err, count) {
+  self.count().asCallback(function (err, count) {
     if (err) return callback(err)
 
+    // Optimization...
+    if (count === 0) return callback(null, [])
+
     let start = Math.floor(Math.random() * count) - limit
     if (start < 0) start = 0
 
-    self.find().sort({ _id: 1 }).skip(start).limit(limit).exec(callback)
+    const query = {
+      order: [
+        [ 'id', 'ASC' ]
+      ],
+      offset: start,
+      limit: limit,
+      include: [ this.sequelize.models.Pod ]
+    }
+
+    self.findAll(query).asCallback(callback)
   })
 }
 
 function removeAll (callback) {
-  this.remove({ }, callback)
-}
-
-function removePodOf (requestsIds, podId, callback) {
-  if (!callback) callback = function () {}
-
-  this.update({ _id: { $in: requestsIds } }, { $pull: { to: podId } }, { multi: true }, callback)
+  // Delete all requests
+  this.destroy({ truncate: true }).asCallback(callback)
 }
 
 function removeWithEmptyTo (callback) {
   if (!callback) callback = function () {}
 
-  this.remove({ to: { $size: 0 } }, callback)
+  const query = {
+    where: {
+      id: {
+        $notIn: [
+          this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"')
+        ]
+      }
+    }
+  }
+
+  this.destroy(query).asCallback(callback)
 }