client.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. 'use strict'
  2. // eslint-disable-next-line
  3. var Native
  4. // eslint-disable-next-line no-useless-catch
  5. try {
  6. // Wrap this `require()` in a try-catch to avoid upstream bundlers from complaining that this might not be available since it is an optional import
  7. Native = require('pg-native')
  8. } catch (e) {
  9. throw e
  10. }
  11. const TypeOverrides = require('../type-overrides')
  12. const EventEmitter = require('events').EventEmitter
  13. const util = require('util')
  14. const ConnectionParameters = require('../connection-parameters')
  15. const NativeQuery = require('./query')
  16. const Client = (module.exports = function (config) {
  17. EventEmitter.call(this)
  18. config = config || {}
  19. this._Promise = config.Promise || global.Promise
  20. this._types = new TypeOverrides(config.types)
  21. this.native = new Native({
  22. types: this._types,
  23. })
  24. this._queryQueue = []
  25. this._ending = false
  26. this._connecting = false
  27. this._connected = false
  28. this._queryable = true
  29. // keep these on the object for legacy reasons
  30. // for the time being. TODO: deprecate all this jazz
  31. const cp = (this.connectionParameters = new ConnectionParameters(config))
  32. if (config.nativeConnectionString) cp.nativeConnectionString = config.nativeConnectionString
  33. this.user = cp.user
  34. // "hiding" the password so it doesn't show up in stack traces
  35. // or if the client is console.logged
  36. Object.defineProperty(this, 'password', {
  37. configurable: true,
  38. enumerable: false,
  39. writable: true,
  40. value: cp.password,
  41. })
  42. this.database = cp.database
  43. this.host = cp.host
  44. this.port = cp.port
  45. // a hash to hold named queries
  46. this.namedQueries = {}
  47. })
  48. Client.Query = NativeQuery
  49. util.inherits(Client, EventEmitter)
  50. Client.prototype._errorAllQueries = function (err) {
  51. const enqueueError = (query) => {
  52. process.nextTick(() => {
  53. query.native = this.native
  54. query.handleError(err)
  55. })
  56. }
  57. if (this._hasActiveQuery()) {
  58. enqueueError(this._activeQuery)
  59. this._activeQuery = null
  60. }
  61. this._queryQueue.forEach(enqueueError)
  62. this._queryQueue.length = 0
  63. }
  64. // connect to the backend
  65. // pass an optional callback to be called once connected
  66. // or with an error if there was a connection error
  67. Client.prototype._connect = function (cb) {
  68. const self = this
  69. if (this._connecting) {
  70. process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.')))
  71. return
  72. }
  73. this._connecting = true
  74. this.connectionParameters.getLibpqConnectionString(function (err, conString) {
  75. if (self.connectionParameters.nativeConnectionString) conString = self.connectionParameters.nativeConnectionString
  76. if (err) return cb(err)
  77. self.native.connect(conString, function (err) {
  78. if (err) {
  79. self.native.end()
  80. return cb(err)
  81. }
  82. // set internal states to connected
  83. self._connected = true
  84. // handle connection errors from the native layer
  85. self.native.on('error', function (err) {
  86. self._queryable = false
  87. self._errorAllQueries(err)
  88. self.emit('error', err)
  89. })
  90. self.native.on('notification', function (msg) {
  91. self.emit('notification', {
  92. channel: msg.relname,
  93. payload: msg.extra,
  94. })
  95. })
  96. // signal we are connected now
  97. self.emit('connect')
  98. self._pulseQueryQueue(true)
  99. cb()
  100. })
  101. })
  102. }
  103. Client.prototype.connect = function (callback) {
  104. if (callback) {
  105. this._connect(callback)
  106. return
  107. }
  108. return new this._Promise((resolve, reject) => {
  109. this._connect((error) => {
  110. if (error) {
  111. reject(error)
  112. } else {
  113. resolve()
  114. }
  115. })
  116. })
  117. }
  118. // send a query to the server
  119. // this method is highly overloaded to take
  120. // 1) string query, optional array of parameters, optional function callback
  121. // 2) object query with {
  122. // string query
  123. // optional array values,
  124. // optional function callback instead of as a separate parameter
  125. // optional string name to name & cache the query plan
  126. // optional string rowMode = 'array' for an array of results
  127. // }
  128. Client.prototype.query = function (config, values, callback) {
  129. let query
  130. let result
  131. let readTimeout
  132. let readTimeoutTimer
  133. let queryCallback
  134. if (config === null || config === undefined) {
  135. throw new TypeError('Client was passed a null or undefined query')
  136. } else if (typeof config.submit === 'function') {
  137. readTimeout = config.query_timeout || this.connectionParameters.query_timeout
  138. result = query = config
  139. // accept query(new Query(...), (err, res) => { }) style
  140. if (typeof values === 'function') {
  141. config.callback = values
  142. }
  143. } else {
  144. readTimeout = config.query_timeout || this.connectionParameters.query_timeout
  145. query = new NativeQuery(config, values, callback)
  146. if (!query.callback) {
  147. let resolveOut, rejectOut
  148. result = new this._Promise((resolve, reject) => {
  149. resolveOut = resolve
  150. rejectOut = reject
  151. }).catch((err) => {
  152. Error.captureStackTrace(err)
  153. throw err
  154. })
  155. query.callback = (err, res) => (err ? rejectOut(err) : resolveOut(res))
  156. }
  157. }
  158. if (readTimeout) {
  159. queryCallback = query.callback
  160. readTimeoutTimer = setTimeout(() => {
  161. const error = new Error('Query read timeout')
  162. process.nextTick(() => {
  163. query.handleError(error, this.connection)
  164. })
  165. queryCallback(error)
  166. // we already returned an error,
  167. // just do nothing if query completes
  168. query.callback = () => {}
  169. // Remove from queue
  170. const index = this._queryQueue.indexOf(query)
  171. if (index > -1) {
  172. this._queryQueue.splice(index, 1)
  173. }
  174. this._pulseQueryQueue()
  175. }, readTimeout)
  176. query.callback = (err, res) => {
  177. clearTimeout(readTimeoutTimer)
  178. queryCallback(err, res)
  179. }
  180. }
  181. if (!this._queryable) {
  182. query.native = this.native
  183. process.nextTick(() => {
  184. query.handleError(new Error('Client has encountered a connection error and is not queryable'))
  185. })
  186. return result
  187. }
  188. if (this._ending) {
  189. query.native = this.native
  190. process.nextTick(() => {
  191. query.handleError(new Error('Client was closed and is not queryable'))
  192. })
  193. return result
  194. }
  195. this._queryQueue.push(query)
  196. this._pulseQueryQueue()
  197. return result
  198. }
  199. // disconnect from the backend server
  200. Client.prototype.end = function (cb) {
  201. const self = this
  202. this._ending = true
  203. if (!this._connected) {
  204. this.once('connect', this.end.bind(this, cb))
  205. }
  206. let result
  207. if (!cb) {
  208. result = new this._Promise(function (resolve, reject) {
  209. cb = (err) => (err ? reject(err) : resolve())
  210. })
  211. }
  212. this.native.end(function () {
  213. self._errorAllQueries(new Error('Connection terminated'))
  214. process.nextTick(() => {
  215. self.emit('end')
  216. if (cb) cb()
  217. })
  218. })
  219. return result
  220. }
  221. Client.prototype._hasActiveQuery = function () {
  222. return this._activeQuery && this._activeQuery.state !== 'error' && this._activeQuery.state !== 'end'
  223. }
  224. Client.prototype._pulseQueryQueue = function (initialConnection) {
  225. if (!this._connected) {
  226. return
  227. }
  228. if (this._hasActiveQuery()) {
  229. return
  230. }
  231. const query = this._queryQueue.shift()
  232. if (!query) {
  233. if (!initialConnection) {
  234. this.emit('drain')
  235. }
  236. return
  237. }
  238. this._activeQuery = query
  239. query.submit(this)
  240. const self = this
  241. query.once('_done', function () {
  242. self._pulseQueryQueue()
  243. })
  244. }
  245. // attempt to cancel an in-progress query
  246. Client.prototype.cancel = function (query) {
  247. if (this._activeQuery === query) {
  248. this.native.cancel(function () {})
  249. } else if (this._queryQueue.indexOf(query) !== -1) {
  250. this._queryQueue.splice(this._queryQueue.indexOf(query), 1)
  251. }
  252. }
  253. Client.prototype.ref = function () {}
  254. Client.prototype.unref = function () {}
  255. Client.prototype.setTypeParser = function (oid, format, parseFn) {
  256. return this._types.setTypeParser(oid, format, parseFn)
  257. }
  258. Client.prototype.getTypeParser = function (oid, format) {
  259. return this._types.getTypeParser(oid, format)
  260. }