connection.js 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948
  1. // This file was modified by Oracle on June 1, 2021.
  2. // The changes involve new logic to handle an additional ERR Packet sent by
  3. // the MySQL server when the connection is closed unexpectedly.
  4. // Modifications copyright (c) 2021, Oracle and/or its affiliates.
  5. // This file was modified by Oracle on June 17, 2021.
  6. // The changes involve logic to ensure the socket connection is closed when
  7. // there is a fatal error.
  8. // Modifications copyright (c) 2021, Oracle and/or its affiliates.
  9. // This file was modified by Oracle on September 21, 2021.
  10. // The changes involve passing additional authentication factor passwords
  11. // to the ChangeUser Command instance.
  12. // Modifications copyright (c) 2021, Oracle and/or its affiliates.
  13. 'use strict';
  14. const Net = require('net');
  15. const Tls = require('tls');
  16. const Timers = require('timers');
  17. const EventEmitter = require('events').EventEmitter;
  18. const Readable = require('stream').Readable;
  19. const Queue = require('denque');
  20. const SqlString = require('sqlstring');
  21. const { createLRU } = require('lru.min');
  22. const PacketParser = require('./packet_parser.js');
  23. const Packets = require('./packets/index.js');
  24. const Commands = require('./commands/index.js');
  25. const ConnectionConfig = require('./connection_config.js');
  26. const CharsetToEncoding = require('./constants/charset_encodings.js');
  27. let _connectionId = 0;
  28. let convertNamedPlaceholders = null;
  29. class Connection extends EventEmitter {
  30. constructor(opts) {
  31. super();
  32. this.config = opts.config;
  33. // TODO: fill defaults
  34. // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
  35. // if host is given, connect to host:3306
  36. // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
  37. // if there is no host/port and no socketPath parameters?
  38. if (!opts.config.stream) {
  39. if (opts.config.socketPath) {
  40. this.stream = Net.connect(opts.config.socketPath);
  41. } else {
  42. this.stream = Net.connect(
  43. opts.config.port,
  44. opts.config.host
  45. );
  46. // Optionally enable keep-alive on the socket.
  47. if (this.config.enableKeepAlive) {
  48. this.stream.on('connect', () => {
  49. this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
  50. });
  51. }
  52. // Enable TCP_NODELAY flag. This is needed so that the network packets
  53. // are sent immediately to the server
  54. this.stream.setNoDelay(true);
  55. }
  56. // if stream is a function, treat it as "stream agent / factory"
  57. } else if (typeof opts.config.stream === 'function') {
  58. this.stream = opts.config.stream(opts);
  59. } else {
  60. this.stream = opts.config.stream;
  61. }
  62. this._internalId = _connectionId++;
  63. this._commands = new Queue();
  64. this._command = null;
  65. this._paused = false;
  66. this._paused_packets = new Queue();
  67. this._statements = createLRU({
  68. max: this.config.maxPreparedStatements,
  69. onEviction: function(_, statement) {
  70. statement.close();
  71. }
  72. });
  73. this.serverCapabilityFlags = 0;
  74. this.authorized = false;
  75. this.sequenceId = 0;
  76. this.compressedSequenceId = 0;
  77. this.threadId = null;
  78. this._handshakePacket = null;
  79. this._fatalError = null;
  80. this._protocolError = null;
  81. this._outOfOrderPackets = [];
  82. this.clientEncoding = CharsetToEncoding[this.config.charsetNumber];
  83. this.stream.on('error', this._handleNetworkError.bind(this));
  84. // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
  85. this.packetParser = new PacketParser(p => {
  86. this.handlePacket(p);
  87. });
  88. this.stream.on('data', data => {
  89. if (this.connectTimeout) {
  90. Timers.clearTimeout(this.connectTimeout);
  91. this.connectTimeout = null;
  92. }
  93. this.packetParser.execute(data);
  94. });
  95. this.stream.on('end', () => {
  96. // emit the end event so that the pooled connection can close the connection
  97. this.emit('end');
  98. });
  99. this.stream.on('close', () => {
  100. // we need to set this flag everywhere where we want connection to close
  101. if (this._closing) {
  102. return;
  103. }
  104. if (!this._protocolError) {
  105. // no particular error message before disconnect
  106. this._protocolError = new Error(
  107. 'Connection lost: The server closed the connection.'
  108. );
  109. this._protocolError.fatal = true;
  110. this._protocolError.code = 'PROTOCOL_CONNECTION_LOST';
  111. }
  112. this._notifyError(this._protocolError);
  113. });
  114. let handshakeCommand;
  115. if (!this.config.isServer) {
  116. handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
  117. handshakeCommand.on('end', () => {
  118. // this happens when handshake finishes early either because there was
  119. // some fatal error or the server sent an error packet instead of
  120. // an hello packet (for example, 'Too many connections' error)
  121. if (!handshakeCommand.handshake || this._fatalError || this._protocolError) {
  122. return;
  123. }
  124. this._handshakePacket = handshakeCommand.handshake;
  125. this.threadId = handshakeCommand.handshake.connectionId;
  126. this.emit('connect', handshakeCommand.handshake);
  127. });
  128. handshakeCommand.on('error', err => {
  129. this._closing = true;
  130. this._notifyError(err);
  131. });
  132. this.addCommand(handshakeCommand);
  133. }
  134. // in case there was no initial handshake but we need to read sting, assume it utf-8
  135. // most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
  136. // will be overwritten with actual encoding value as soon as server handshake packet is received
  137. this.serverEncoding = 'utf8';
  138. if (this.config.connectTimeout) {
  139. const timeoutHandler = this._handleTimeoutError.bind(this);
  140. this.connectTimeout = Timers.setTimeout(
  141. timeoutHandler,
  142. this.config.connectTimeout
  143. );
  144. }
  145. }
  146. promise(promiseImpl) {
  147. const PromiseConnection = require('../promise').PromiseConnection;
  148. return new PromiseConnection(this, promiseImpl);
  149. }
  150. _addCommandClosedState(cmd) {
  151. const err = new Error(
  152. "Can't add new command when connection is in closed state"
  153. );
  154. err.fatal = true;
  155. if (cmd.onResult) {
  156. cmd.onResult(err);
  157. } else {
  158. this.emit('error', err);
  159. }
  160. }
  161. _handleFatalError(err) {
  162. err.fatal = true;
  163. // stop receiving packets
  164. this.stream.removeAllListeners('data');
  165. this.addCommand = this._addCommandClosedState;
  166. this.write = () => {
  167. this.emit('error', new Error("Can't write in closed state"));
  168. };
  169. this._notifyError(err);
  170. this._fatalError = err;
  171. }
  172. _handleNetworkError(err) {
  173. if (this.connectTimeout) {
  174. Timers.clearTimeout(this.connectTimeout);
  175. this.connectTimeout = null;
  176. }
  177. // Do not throw an error when a connection ends with a RST,ACK packet
  178. if (err.code === 'ECONNRESET' && this._closing) {
  179. return;
  180. }
  181. this._handleFatalError(err);
  182. }
  183. _handleTimeoutError() {
  184. if (this.connectTimeout) {
  185. Timers.clearTimeout(this.connectTimeout);
  186. this.connectTimeout = null;
  187. }
  188. this.stream.destroy && this.stream.destroy();
  189. const err = new Error('connect ETIMEDOUT');
  190. err.errorno = 'ETIMEDOUT';
  191. err.code = 'ETIMEDOUT';
  192. err.syscall = 'connect';
  193. this._handleNetworkError(err);
  194. }
  195. // notify all commands in the queue and bubble error as connection "error"
  196. // called on stream error or unexpected termination
  197. _notifyError(err) {
  198. if (this.connectTimeout) {
  199. Timers.clearTimeout(this.connectTimeout);
  200. this.connectTimeout = null;
  201. }
  202. // prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET
  203. if (this._fatalError) {
  204. return;
  205. }
  206. let command;
  207. // if there is no active command, notify connection
  208. // if there are commands and all of them have callbacks, pass error via callback
  209. let bubbleErrorToConnection = !this._command;
  210. if (this._command && this._command.onResult) {
  211. this._command.onResult(err);
  212. this._command = null;
  213. // connection handshake is special because we allow it to be implicit
  214. // if error happened during handshake, but there are others commands in queue
  215. // then bubble error to other commands and not to connection
  216. } else if (
  217. !(
  218. this._command &&
  219. this._command.constructor === Commands.ClientHandshake &&
  220. this._commands.length > 0
  221. )
  222. ) {
  223. bubbleErrorToConnection = true;
  224. }
  225. while ((command = this._commands.shift())) {
  226. if (command.onResult) {
  227. command.onResult(err);
  228. } else {
  229. bubbleErrorToConnection = true;
  230. }
  231. }
  232. // notify connection if some comands in the queue did not have callbacks
  233. // or if this is pool connection ( so it can be removed from pool )
  234. if (bubbleErrorToConnection || this._pool) {
  235. this.emit('error', err);
  236. }
  237. // close connection after emitting the event in case of a fatal error
  238. if (err.fatal) {
  239. this.close();
  240. }
  241. }
  242. write(buffer) {
  243. const result = this.stream.write(buffer, err => {
  244. if (err) {
  245. this._handleNetworkError(err);
  246. }
  247. });
  248. if (!result) {
  249. this.stream.emit('pause');
  250. }
  251. }
  252. // http://dev.mysql.com/doc/internals/en/sequence-id.html
  253. //
  254. // The sequence-id is incremented with each packet and may wrap around.
  255. // It starts at 0 and is reset to 0 when a new command
  256. // begins in the Command Phase.
  257. // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
  258. _resetSequenceId() {
  259. this.sequenceId = 0;
  260. this.compressedSequenceId = 0;
  261. }
  262. _bumpCompressedSequenceId(numPackets) {
  263. this.compressedSequenceId += numPackets;
  264. this.compressedSequenceId %= 256;
  265. }
  266. _bumpSequenceId(numPackets) {
  267. this.sequenceId += numPackets;
  268. this.sequenceId %= 256;
  269. }
  270. writePacket(packet) {
  271. const MAX_PACKET_LENGTH = 16777215;
  272. const length = packet.length();
  273. let chunk, offset, header;
  274. if (length < MAX_PACKET_LENGTH) {
  275. packet.writeHeader(this.sequenceId);
  276. if (this.config.debug) {
  277. // eslint-disable-next-line no-console
  278. console.log(
  279. `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
  280. );
  281. // eslint-disable-next-line no-console
  282. console.log(
  283. `${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`
  284. );
  285. }
  286. this._bumpSequenceId(1);
  287. this.write(packet.buffer);
  288. } else {
  289. if (this.config.debug) {
  290. // eslint-disable-next-line no-console
  291. console.log(
  292. `${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:`
  293. );
  294. // eslint-disable-next-line no-console
  295. console.log(
  296. `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
  297. );
  298. }
  299. for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) {
  300. chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH);
  301. if (chunk.length === MAX_PACKET_LENGTH) {
  302. header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]);
  303. } else {
  304. header = Buffer.from([
  305. chunk.length & 0xff,
  306. (chunk.length >> 8) & 0xff,
  307. (chunk.length >> 16) & 0xff,
  308. this.sequenceId
  309. ]);
  310. }
  311. this._bumpSequenceId(1);
  312. this.write(header);
  313. this.write(chunk);
  314. }
  315. }
  316. }
  317. // 0.11+ environment
  318. startTLS(onSecure) {
  319. if (this.config.debug) {
  320. // eslint-disable-next-line no-console
  321. console.log('Upgrading connection to TLS');
  322. }
  323. const secureContext = Tls.createSecureContext({
  324. ca: this.config.ssl.ca,
  325. cert: this.config.ssl.cert,
  326. ciphers: this.config.ssl.ciphers,
  327. key: this.config.ssl.key,
  328. passphrase: this.config.ssl.passphrase,
  329. minVersion: this.config.ssl.minVersion,
  330. maxVersion: this.config.ssl.maxVersion
  331. });
  332. const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
  333. const verifyIdentity = this.config.ssl.verifyIdentity;
  334. const servername = this.config.host;
  335. let secureEstablished = false;
  336. this.stream.removeAllListeners('data');
  337. const secureSocket = Tls.connect({
  338. rejectUnauthorized,
  339. requestCert: rejectUnauthorized,
  340. checkServerIdentity: verifyIdentity
  341. ? Tls.checkServerIdentity
  342. : function() { return undefined; },
  343. secureContext,
  344. isServer: false,
  345. socket: this.stream,
  346. servername
  347. }, () => {
  348. secureEstablished = true;
  349. if (rejectUnauthorized) {
  350. if (typeof servername === 'string' && verifyIdentity) {
  351. const cert = secureSocket.getPeerCertificate(true);
  352. const serverIdentityCheckError = Tls.checkServerIdentity(servername, cert);
  353. if (serverIdentityCheckError) {
  354. onSecure(serverIdentityCheckError);
  355. return;
  356. }
  357. }
  358. }
  359. onSecure();
  360. });
  361. // error handler for secure socket
  362. secureSocket.on('error', err => {
  363. if (secureEstablished) {
  364. this._handleNetworkError(err);
  365. } else {
  366. onSecure(err);
  367. }
  368. });
  369. secureSocket.on('data', data => {
  370. this.packetParser.execute(data);
  371. });
  372. this.write = buffer => secureSocket.write(buffer);
  373. }
  374. protocolError(message, code) {
  375. // Starting with MySQL 8.0.24, if the client closes the connection
  376. // unexpectedly, the server will send a last ERR Packet, which we can
  377. // safely ignore.
  378. // https://dev.mysql.com/worklog/task/?id=12999
  379. if (this._closing) {
  380. return;
  381. }
  382. const err = new Error(message);
  383. err.fatal = true;
  384. err.code = code || 'PROTOCOL_ERROR';
  385. this.emit('error', err);
  386. }
  387. get fatalError() {
  388. return this._fatalError;
  389. }
  390. handlePacket(packet) {
  391. if (this._paused) {
  392. this._paused_packets.push(packet);
  393. return;
  394. }
  395. if (this.config.debug) {
  396. if (packet) {
  397. // eslint-disable-next-line no-console
  398. console.log(
  399. ` raw: ${packet.buffer
  400. .slice(packet.offset, packet.offset + packet.length())
  401. .toString('hex')}`
  402. );
  403. // eslint-disable-next-line no-console
  404. console.trace();
  405. const commandName = this._command
  406. ? this._command._commandName
  407. : '(no command)';
  408. const stateName = this._command
  409. ? this._command.stateName()
  410. : '(no command)';
  411. // eslint-disable-next-line no-console
  412. console.log(
  413. `${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`
  414. );
  415. }
  416. }
  417. if (!this._command) {
  418. const marker = packet.peekByte();
  419. // If it's an Err Packet, we should use it.
  420. if (marker === 0xff) {
  421. const error = Packets.Error.fromPacket(packet);
  422. this.protocolError(error.message, error.code);
  423. } else {
  424. // Otherwise, it means it's some other unexpected packet.
  425. this.protocolError(
  426. 'Unexpected packet while no commands in the queue',
  427. 'PROTOCOL_UNEXPECTED_PACKET'
  428. );
  429. }
  430. this.close();
  431. return;
  432. }
  433. if (packet) {
  434. // Note: when server closes connection due to inactivity, Err packet ER_CLIENT_INTERACTION_TIMEOUT from MySQL 8.0.24, sequenceId will be 0
  435. if (this.sequenceId !== packet.sequenceId) {
  436. const err = new Error(
  437. `Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}`
  438. );
  439. err.expected = this.sequenceId;
  440. err.received = packet.sequenceId;
  441. this.emit('warn', err); // REVIEW
  442. // eslint-disable-next-line no-console
  443. console.error(err.message);
  444. }
  445. this._bumpSequenceId(packet.numPackets);
  446. }
  447. try {
  448. if (this._fatalError) {
  449. // skip remaining packets after client is in the error state
  450. return;
  451. }
  452. const done = this._command.execute(packet, this);
  453. if (done) {
  454. this._command = this._commands.shift();
  455. if (this._command) {
  456. this.sequenceId = 0;
  457. this.compressedSequenceId = 0;
  458. this.handlePacket();
  459. }
  460. }
  461. } catch (err) {
  462. this._handleFatalError(err);
  463. this.stream.destroy();
  464. }
  465. }
  466. addCommand(cmd) {
  467. // this.compressedSequenceId = 0;
  468. // this.sequenceId = 0;
  469. if (this.config.debug) {
  470. const commandName = cmd.constructor.name;
  471. // eslint-disable-next-line no-console
  472. console.log(`Add command: ${commandName}`);
  473. cmd._commandName = commandName;
  474. }
  475. if (!this._command) {
  476. this._command = cmd;
  477. this.handlePacket();
  478. } else {
  479. this._commands.push(cmd);
  480. }
  481. return cmd;
  482. }
  483. format(sql, values) {
  484. if (typeof this.config.queryFormat === 'function') {
  485. return this.config.queryFormat.call(
  486. this,
  487. sql,
  488. values,
  489. this.config.timezone
  490. );
  491. }
  492. const opts = {
  493. sql: sql,
  494. values: values
  495. };
  496. this._resolveNamedPlaceholders(opts);
  497. return SqlString.format(
  498. opts.sql,
  499. opts.values,
  500. this.config.stringifyObjects,
  501. this.config.timezone
  502. );
  503. }
  504. escape(value) {
  505. return SqlString.escape(value, false, this.config.timezone);
  506. }
  507. escapeId(value) {
  508. return SqlString.escapeId(value, false);
  509. }
  510. raw(sql) {
  511. return SqlString.raw(sql);
  512. }
  513. _resolveNamedPlaceholders(options) {
  514. let unnamed;
  515. if (this.config.namedPlaceholders || options.namedPlaceholders) {
  516. if (Array.isArray(options.values)) {
  517. // if an array is provided as the values, assume the conversion is not necessary.
  518. // this allows the usage of unnamed placeholders even if the namedPlaceholders flag is enabled.
  519. return
  520. }
  521. if (convertNamedPlaceholders === null) {
  522. convertNamedPlaceholders = require('named-placeholders')();
  523. }
  524. unnamed = convertNamedPlaceholders(options.sql, options.values);
  525. options.sql = unnamed[0];
  526. options.values = unnamed[1];
  527. }
  528. }
  529. query(sql, values, cb) {
  530. let cmdQuery;
  531. if (sql.constructor === Commands.Query) {
  532. cmdQuery = sql;
  533. } else {
  534. cmdQuery = Connection.createQuery(sql, values, cb, this.config);
  535. }
  536. this._resolveNamedPlaceholders(cmdQuery);
  537. const rawSql = this.format(cmdQuery.sql, cmdQuery.values !== undefined ? cmdQuery.values : []);
  538. cmdQuery.sql = rawSql;
  539. return this.addCommand(cmdQuery);
  540. }
  541. pause() {
  542. this._paused = true;
  543. this.stream.pause();
  544. }
  545. resume() {
  546. let packet;
  547. this._paused = false;
  548. while ((packet = this._paused_packets.shift())) {
  549. this.handlePacket(packet);
  550. // don't resume if packet handler paused connection
  551. if (this._paused) {
  552. return;
  553. }
  554. }
  555. this.stream.resume();
  556. }
  557. // TODO: named placeholders support
  558. prepare(options, cb) {
  559. if (typeof options === 'string') {
  560. options = { sql: options };
  561. }
  562. return this.addCommand(new Commands.Prepare(options, cb));
  563. }
  564. unprepare(sql) {
  565. let options = {};
  566. if (typeof sql === 'object') {
  567. options = sql;
  568. } else {
  569. options.sql = sql;
  570. }
  571. const key = Connection.statementKey(options);
  572. const stmt = this._statements.get(key);
  573. if (stmt) {
  574. this._statements.delete(key);
  575. stmt.close();
  576. }
  577. return stmt;
  578. }
  579. execute(sql, values, cb) {
  580. let options = {
  581. infileStreamFactory: this.config.infileStreamFactory
  582. };
  583. if (typeof sql === 'object') {
  584. // execute(options, cb)
  585. options = {
  586. ...options,
  587. ...sql,
  588. sql: sql.sql,
  589. values: sql.values
  590. };
  591. if (typeof values === 'function') {
  592. cb = values;
  593. } else {
  594. options.values = options.values || values;
  595. }
  596. } else if (typeof values === 'function') {
  597. // execute(sql, cb)
  598. cb = values;
  599. options.sql = sql;
  600. options.values = undefined;
  601. } else {
  602. // execute(sql, values, cb)
  603. options.sql = sql;
  604. options.values = values;
  605. }
  606. this._resolveNamedPlaceholders(options);
  607. // check for values containing undefined
  608. if (options.values) {
  609. //If namedPlaceholder is not enabled and object is passed as bind parameters
  610. if (!Array.isArray(options.values)) {
  611. throw new TypeError(
  612. 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
  613. );
  614. }
  615. options.values.forEach(val => {
  616. //If namedPlaceholder is not enabled and object is passed as bind parameters
  617. if (!Array.isArray(options.values)) {
  618. throw new TypeError(
  619. 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
  620. );
  621. }
  622. if (val === undefined) {
  623. throw new TypeError(
  624. 'Bind parameters must not contain undefined. To pass SQL NULL specify JS null'
  625. );
  626. }
  627. if (typeof val === 'function') {
  628. throw new TypeError(
  629. 'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first'
  630. );
  631. }
  632. });
  633. }
  634. const executeCommand = new Commands.Execute(options, cb);
  635. const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
  636. if (err) {
  637. // skip execute command if prepare failed, we have main
  638. // combined callback here
  639. executeCommand.start = function() {
  640. return null;
  641. };
  642. if (cb) {
  643. cb(err);
  644. } else {
  645. executeCommand.emit('error', err);
  646. }
  647. executeCommand.emit('end');
  648. return;
  649. }
  650. executeCommand.statement = stmt;
  651. });
  652. this.addCommand(prepareCommand);
  653. this.addCommand(executeCommand);
  654. return executeCommand;
  655. }
  656. changeUser(options, callback) {
  657. if (!callback && typeof options === 'function') {
  658. callback = options;
  659. options = {};
  660. }
  661. const charsetNumber = options.charset
  662. ? ConnectionConfig.getCharsetNumber(options.charset)
  663. : this.config.charsetNumber;
  664. return this.addCommand(
  665. new Commands.ChangeUser(
  666. {
  667. user: options.user || this.config.user,
  668. // for the purpose of multi-factor authentication, or not, the main
  669. // password (used for the 1st authentication factor) can also be
  670. // provided via the "password1" option
  671. password: options.password || options.password1 || this.config.password || this.config.password1,
  672. password2: options.password2 || this.config.password2,
  673. password3: options.password3 || this.config.password3,
  674. passwordSha1: options.passwordSha1 || this.config.passwordSha1,
  675. database: options.database || this.config.database,
  676. timeout: options.timeout,
  677. charsetNumber: charsetNumber,
  678. currentConfig: this.config
  679. },
  680. err => {
  681. if (err) {
  682. err.fatal = true;
  683. }
  684. if (callback) {
  685. callback(err);
  686. }
  687. }
  688. )
  689. );
  690. }
  691. // transaction helpers
  692. beginTransaction(cb) {
  693. return this.query('START TRANSACTION', cb);
  694. }
  695. commit(cb) {
  696. return this.query('COMMIT', cb);
  697. }
  698. rollback(cb) {
  699. return this.query('ROLLBACK', cb);
  700. }
  701. ping(cb) {
  702. return this.addCommand(new Commands.Ping(cb));
  703. }
  704. _registerSlave(opts, cb) {
  705. return this.addCommand(new Commands.RegisterSlave(opts, cb));
  706. }
  707. _binlogDump(opts, cb) {
  708. return this.addCommand(new Commands.BinlogDump(opts, cb));
  709. }
  710. // currently just alias to close
  711. destroy() {
  712. this.close();
  713. }
  714. close() {
  715. if (this.connectTimeout) {
  716. Timers.clearTimeout(this.connectTimeout);
  717. this.connectTimeout = null;
  718. }
  719. this._closing = true;
  720. this.stream.end();
  721. this.addCommand = this._addCommandClosedState;
  722. }
  723. createBinlogStream(opts) {
  724. // TODO: create proper stream class
  725. // TODO: use through2
  726. let test = 1;
  727. const stream = new Readable({ objectMode: true });
  728. stream._read = function() {
  729. return {
  730. data: test++
  731. };
  732. };
  733. this._registerSlave(opts, () => {
  734. const dumpCmd = this._binlogDump(opts);
  735. dumpCmd.on('event', ev => {
  736. stream.push(ev);
  737. });
  738. dumpCmd.on('eof', () => {
  739. stream.push(null);
  740. // if non-blocking, then close stream to prevent errors
  741. if (opts.flags && opts.flags & 0x01) {
  742. this.close();
  743. }
  744. });
  745. // TODO: pipe errors as well
  746. });
  747. return stream;
  748. }
  749. connect(cb) {
  750. if (!cb) {
  751. return;
  752. }
  753. if (this._fatalError || this._protocolError) {
  754. return cb(this._fatalError || this._protocolError);
  755. }
  756. if (this._handshakePacket) {
  757. return cb(null, this);
  758. }
  759. let connectCalled = 0;
  760. function callbackOnce(isErrorHandler) {
  761. return function(param) {
  762. if (!connectCalled) {
  763. if (isErrorHandler) {
  764. cb(param);
  765. } else {
  766. cb(null, param);
  767. }
  768. }
  769. connectCalled = 1;
  770. };
  771. }
  772. this.once('error', callbackOnce(true));
  773. this.once('connect', callbackOnce(false));
  774. }
  775. // ===================================
  776. // outgoing server connection methods
  777. // ===================================
  778. writeColumns(columns) {
  779. this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
  780. columns.forEach(column => {
  781. this.writePacket(
  782. Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding)
  783. );
  784. });
  785. this.writeEof();
  786. }
  787. // row is array of columns, not hash
  788. writeTextRow(column) {
  789. this.writePacket(
  790. Packets.TextRow.toPacket(column, this.serverConfig.encoding)
  791. );
  792. }
  793. writeBinaryRow(column) {
  794. this.writePacket(
  795. Packets.BinaryRow.toPacket(column, this.serverConfig.encoding)
  796. );
  797. }
  798. writeTextResult(rows, columns, binary=false) {
  799. this.writeColumns(columns);
  800. rows.forEach(row => {
  801. const arrayRow = new Array(columns.length);
  802. columns.forEach(column => {
  803. arrayRow.push(row[column.name]);
  804. });
  805. if(binary) {
  806. this.writeBinaryRow(arrayRow);
  807. }
  808. else this.writeTextRow(arrayRow);
  809. });
  810. this.writeEof();
  811. }
  812. writeEof(warnings, statusFlags) {
  813. this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
  814. }
  815. writeOk(args) {
  816. if (!args) {
  817. args = { affectedRows: 0 };
  818. }
  819. this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding));
  820. }
  821. writeError(args) {
  822. // if we want to send error before initial hello was sent, use default encoding
  823. const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8';
  824. this.writePacket(Packets.Error.toPacket(args, encoding));
  825. }
  826. serverHandshake(args) {
  827. this.serverConfig = args;
  828. this.serverConfig.encoding =
  829. CharsetToEncoding[this.serverConfig.characterSet];
  830. return this.addCommand(new Commands.ServerHandshake(args));
  831. }
  832. // ===============================================================
  833. end(callback) {
  834. if (this.config.isServer) {
  835. this._closing = true;
  836. const quitCmd = new EventEmitter();
  837. setImmediate(() => {
  838. this.stream.end();
  839. quitCmd.emit('end');
  840. });
  841. return quitCmd;
  842. }
  843. // trigger error if more commands enqueued after end command
  844. const quitCmd = this.addCommand(new Commands.Quit(callback));
  845. this.addCommand = this._addCommandClosedState;
  846. return quitCmd;
  847. }
  848. static createQuery(sql, values, cb, config) {
  849. let options = {
  850. rowsAsArray: config.rowsAsArray,
  851. infileStreamFactory: config.infileStreamFactory
  852. };
  853. if (typeof sql === 'object') {
  854. // query(options, cb)
  855. options = {
  856. ...options,
  857. ...sql,
  858. sql: sql.sql,
  859. values: sql.values
  860. };
  861. if (typeof values === 'function') {
  862. cb = values;
  863. } else if (values !== undefined) {
  864. options.values = values;
  865. }
  866. } else if (typeof values === 'function') {
  867. // query(sql, cb)
  868. cb = values;
  869. options.sql = sql;
  870. options.values = undefined;
  871. } else {
  872. // query(sql, values, cb)
  873. options.sql = sql;
  874. options.values = values;
  875. }
  876. return new Commands.Query(options, cb);
  877. }
  878. static statementKey(options) {
  879. return (
  880. `${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`
  881. );
  882. }
  883. }
  884. module.exports = Connection;