MySQL驱动阅读------executeQuery查询的过程,基于JDBC-----5.1.26

Statement statement = connection.createStatement();
final ResultSet resultSet = statement.executeQuery(String sql);

上面两句是普通的查询过程,下面分析下驱动程序是如何进行查询和数据返回的,说明一下,本质上就是驱动程序通过Connection建立的TCP连接,将sql语句发送到MySQL服务器,然后接受mysql服务器的返回结果,解析成为ResultSet对象,重点就是这个发送和获取返回值的过程。

public java.sql.Statement createStatement(int resultSetType,
			int resultSetConcurrency) throws SQLException {
		checkClosed();

		StatementImpl stmt = new StatementImpl(getLoadBalanceSafeProxy(), this.database);
		stmt.setResultSetType(resultSetType);
		stmt.setResultSetConcurrency(resultSetConcurrency);

		return stmt;
	}

在StatementImpl类当中的

public java.sql.ResultSet executeQuery(String sql)
			throws SQLException {
		synchronized (checkClosed().getConnectionMutex()) {
			MySQLConnection locallyScopedConn = this.connection;
			
			this.retrieveGeneratedKeys = false;
			
			resetCancelledState();

			checkNullOrEmptyQuery(sql);

			boolean doStreaming = createStreamingResultSet();

			// Adjust net_write_timeout to a higher value if we're
			// streaming result sets. More often than not, someone runs into
			// an issue where they blow net_write_timeout when using this
			// feature, and if they're willing to hold a result set open
			// for 30 seconds or more, one more round-trip isn't going to hurt
			//
			// This is reset by RowDataDynamic.close().

			if (doStreaming
					&& this.connection.getNetTimeoutForStreamingResults() > 0) {
				executeSimpleNonQuery(locallyScopedConn, "SET net_write_timeout="
						+ this.connection.getNetTimeoutForStreamingResults());
			}

			if (this.doEscapeProcessing) {
				Object escapedSqlResult = EscapeProcessor.escapeSQL(sql,
						locallyScopedConn.serverSupportsConvertFn(), this.connection);

				if (escapedSqlResult instanceof String) {
					sql = (String) escapedSqlResult;
				} else {
					sql = ((EscapeProcessorResult) escapedSqlResult).escapedSql;
				}
			}

			char firstStatementChar = StringUtils.firstNonWsCharUc(sql,
					findStartOfStatement(sql));

			if (sql.charAt(0) == '/') {
				if (sql.startsWith(PING_MARKER)) {
					doPingInstead();
				
					return this.results;
				}
			}
			
			checkForDml(sql, firstStatementChar);

			if (!locallyScopedConn.getHoldResultsOpenOverStatementClose()) {
				if (this.results != null) {
					this.results.realClose(false);
				}
				if (this.generatedKeysResults != null) {
					this.generatedKeysResults.realClose(false);
				}
				closeAllOpenResults();
			}

			CachedResultSetMetaData cachedMetaData = null;

			// If there isn't a limit clause in the SQL
			// then limit the number of rows to return in
			// an efficient manner. Only do this if
			// setMaxRows() hasn't been used on any Statements
			// generated from the current Connection (saves
			// a query, and network traffic).

			if (useServerFetch()) {
				this.results = createResultSetUsingServerFetch(sql);

				return this.results;
			}

			CancelTask timeoutTask = null;

			String oldCatalog = null;

			try {
				if (locallyScopedConn.getEnableQueryTimeouts() &&
						this.timeoutInMillis != 0
						&& locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {
					timeoutTask = new CancelTask(this);
					locallyScopedConn.getCancelTimer().schedule(timeoutTask,
							this.timeoutInMillis);
				}

				if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {
					oldCatalog = locallyScopedConn.getCatalog();
					locallyScopedConn.setCatalog(this.currentCatalog);
				}

				//
				// Check if we have cached metadata for this query...
				//

				Field[] cachedFields = null;

				if (locallyScopedConn.getCacheResultSetMetadata()) {
					cachedMetaData = locallyScopedConn.getCachedMetaData(sql);

					if (cachedMetaData != null) {
						cachedFields = cachedMetaData.fields;
					}
				}

				if (locallyScopedConn.useMaxRows()) {
					// We need to execute this all together
					// So synchronize on the Connection's mutex (because
					// even queries going through there synchronize
					// on the connection
					if (StringUtils.indexOfIgnoreCase(sql, "LIMIT") != -1) { //$NON-NLS-1$
						this.results = locallyScopedConn.execSQL(this, sql,
								this.maxRows, null, this.resultSetType,
								this.resultSetConcurrency,
								doStreaming,
								this.currentCatalog, cachedFields);
					} else {
						if (this.maxRows <= 0) {
							executeSimpleNonQuery(locallyScopedConn,
									"SET SQL_SELECT_LIMIT=DEFAULT");
						} else {
							executeSimpleNonQuery(locallyScopedConn,
									"SET SQL_SELECT_LIMIT=" + this.maxRows);
						}

						statementBegins();
						
						this.results = locallyScopedConn.execSQL(this, sql, -1,
								null, this.resultSetType,
								this.resultSetConcurrency,
								doStreaming,
								this.currentCatalog, cachedFields);

						if (oldCatalog != null) {
							locallyScopedConn.setCatalog(oldCatalog);
						}
					}
				} else {
					statementBegins();
					
					this.results = locallyScopedConn.execSQL(this, sql, -1, null,
							this.resultSetType, this.resultSetConcurrency,
							doStreaming,
							this.currentCatalog, cachedFields);
				}

				if (timeoutTask != null) {
					if (timeoutTask.caughtWhileCancelling != null) {
						throw timeoutTask.caughtWhileCancelling;
					}

					timeoutTask.cancel();
					
					locallyScopedConn.getCancelTimer().purge();
					
					timeoutTask = null;
				}

				synchronized (this.cancelTimeoutMutex) {
					if (this.wasCancelled) {
						SQLException cause = null;
						
						if (this.wasCancelledByTimeout) {
							cause = new MySQLTimeoutException();
						} else {
							cause = new MySQLStatementCancelledException();
						}
						
						resetCancelledState();
						
						throw cause;
					}
				}
			} finally {
				this.statementExecuting.set(false);
				
				if (timeoutTask != null) {
					timeoutTask.cancel();
					
					locallyScopedConn.getCancelTimer().purge();
				}

				if (oldCatalog != null) {
					locallyScopedConn.setCatalog(oldCatalog);
				}
			}

			this.lastInsertId = this.results.getUpdateID();

			if (cachedMetaData != null) {
				locallyScopedConn.initializeResultsMetadataFromCache(sql, cachedMetaData,
						this.results);
			} else {
				if (this.connection.getCacheResultSetMetadata()) {
					locallyScopedConn.initializeResultsMetadataFromCache(sql,
							null /* will be created */, this.results);
				}
			}

			return this.results;
		}
	}

  

	this.results = locallyScopedConn.execSQL(this, sql, -1,
								null, this.resultSetType,
								this.resultSetConcurrency,
								doStreaming,
								this.currentCatalog, cachedFields);
MySQLConnection locallyScopedConn = this.connection;

在ConnectionImpl类当中

public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows,
			Buffer packet, int resultSetType, int resultSetConcurrency,
			boolean streamResults, String catalog,
			Field[] cachedMetadata,
			boolean isBatch) throws SQLException {
		synchronized (getConnectionMutex()) {
			//
			// Fall-back if the master is back online if we've
			// issued queriesBeforeRetryMaster queries since
			// we failed over
			//
	
			long queryStartTime = 0;
	
			int endOfQueryPacketPosition = 0;
	
			if (packet != null) {
				endOfQueryPacketPosition = packet.getPosition();
			}
	
			if (getGatherPerformanceMetrics()) {
				queryStartTime = System.currentTimeMillis();
			}
	
			this.lastQueryFinishedTime = 0; // we're busy!
	
			if ((getHighAvailability())
					&& (this.autoCommit || getAutoReconnectForPools())
					&& this.needsPing && !isBatch) {
				try {
					pingInternal(false, 0);
	
					this.needsPing = false;
				} catch (Exception Ex) {
					createNewIO(true);
				}
			}
	
			try {
				if (packet == null) {
					String encoding = null;
	
					if (getUseUnicode()) {
						encoding = getEncoding();
					}
	
					return this.io.sqlQueryDirect(callingStatement, sql,
							encoding, null, maxRows, resultSetType,
							resultSetConcurrency, streamResults, catalog,
							cachedMetadata);
				}
	
				return this.io.sqlQueryDirect(callingStatement, null, null,
						packet, maxRows, resultSetType,
						resultSetConcurrency, streamResults, catalog,
						cachedMetadata);
			} catch (java.sql.SQLException sqlE) {
				// don't clobber SQL exceptions
	
				if (getDumpQueriesOnException()) {
					String extractedSql = extractSqlFromPacket(sql, packet,
							endOfQueryPacketPosition);
					StringBuffer messageBuf = new StringBuffer(extractedSql
							.length() + 32);
					messageBuf
							.append("

Query being executed when exception was thrown:
");
					messageBuf.append(extractedSql);
					messageBuf.append("

");
	
					sqlE = appendMessageToException(sqlE, messageBuf.toString(), getExceptionInterceptor());
				}
	
				if ((getHighAvailability())) {
					this.needsPing = true;
				} else {
					String sqlState = sqlE.getSQLState();
	
					if ((sqlState != null)
							&& sqlState
									.equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) {
						cleanup(sqlE);
					}
				}
	
				throw sqlE;
			} catch (Exception ex) {
				if (getHighAvailability()) {
					this.needsPing = true;
				} else if (ex instanceof IOException) {
					cleanup(ex);
				}
	
				SQLException sqlEx = SQLError.createSQLException(
						Messages.getString("Connection.UnexpectedException"),
						SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor());
				sqlEx.initCause(ex);
				
				throw sqlEx;
			} finally {
				if (getMaintainTimeStats()) {
					this.lastQueryFinishedTime = System.currentTimeMillis();
				}
	
	
				if (getGatherPerformanceMetrics()) {
					long queryTime = System.currentTimeMillis()
							- queryStartTime;
	
					registerQueryExecutionTime(queryTime);
				}
			}
		}
	}

  

在MySqlIO类当中的

 final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query,
    		String characterEncoding, Buffer queryPacket, int maxRows,
    		int resultSetType, int resultSetConcurrency,
    		boolean streamResults, String catalog, Field[] cachedMetadata)
    throws Exception {
    	this.statementExecutionDepth++;

    	try {
	    	if (this.statementInterceptors != null) {
	    		ResultSetInternalMethods interceptedResults =
	    			invokeStatementInterceptorsPre(query, callingStatement, false);

	    		if (interceptedResults != null) {
	    			return interceptedResults;
	    		}
	    	}

	    	long queryStartTime = 0;
	    	long queryEndTime = 0;

    		String statementComment = this.connection.getStatementComment();
    		
    		if (this.connection.getIncludeThreadNamesAsStatementComment()) {
    			statementComment = (statementComment != null ? statementComment + ", " : "") + "java thread: " + Thread.currentThread().getName();
    		}
    		
	    	if (query != null) {
	    		// We don't know exactly how many bytes we're going to get
	    		// from the query. Since we're dealing with Unicode, the
	    		// max is 2, so pad it (2 * query) + space for headers
	    		int packLength = HEADER_LENGTH + 1 + (query.length() * 2) + 2;

	    		byte[] commentAsBytes = null;

	    		if (statementComment != null) {
	    			commentAsBytes = StringUtils.getBytes(statementComment, null,
	    					characterEncoding, this.connection
	    					.getServerCharacterEncoding(),
	    					this.connection.parserKnowsUnicode(), getExceptionInterceptor());

	    			packLength += commentAsBytes.length;
	    			packLength += 6; // for /*[space] [space]*/
	    		}

	    		if (this.sendPacket == null) {
	    			this.sendPacket = new Buffer(packLength);
	    		} else {
	    			this.sendPacket.clear();
	    		}

	    		this.sendPacket.writeByte((byte) MysqlDefs.QUERY);

	    		if (commentAsBytes != null) {
	    			this.sendPacket.writeBytesNoNull(Constants.SLASH_STAR_SPACE_AS_BYTES);
	    			this.sendPacket.writeBytesNoNull(commentAsBytes);
	    			this.sendPacket.writeBytesNoNull(Constants.SPACE_STAR_SLASH_SPACE_AS_BYTES);
	    		}

	    		if (characterEncoding != null) {
	    			if (this.platformDbCharsetMatches) {
	    				this.sendPacket.writeStringNoNull(query, characterEncoding,
	    						this.connection.getServerCharacterEncoding(),
	    						this.connection.parserKnowsUnicode(),
	    						this.connection);
	    			} else {
	    				if (StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) { //$NON-NLS-1$
	    					this.sendPacket.writeBytesNoNull(StringUtils.getBytes(query));
	    				} else {
	    					this.sendPacket.writeStringNoNull(query,
	    							characterEncoding,
	    							this.connection.getServerCharacterEncoding(),
	    							this.connection.parserKnowsUnicode(),
	    							this.connection);
	    				}
	    			}
	    		} else {
	    			this.sendPacket.writeStringNoNull(query);
	    		}

	    		queryPacket = this.sendPacket;
	    	}

	    	byte[] queryBuf = null;
	    	int oldPacketPosition = 0;

	    	if (needToGrabQueryFromPacket) {
	    		queryBuf = queryPacket.getByteBuffer();

	    		// save the packet position
	    		oldPacketPosition = queryPacket.getPosition();

	    		queryStartTime = getCurrentTimeNanosOrMillis();
	    	}
	    	
	    	if (this.autoGenerateTestcaseScript) {
	    		String testcaseQuery = null;

	    		if (query != null) {
	    			if (statementComment != null) {
	    				testcaseQuery = "/* " + statementComment + " */ " + query;
	    			} else {
	    				testcaseQuery = query;
	    			}
	    		} else {
	    			testcaseQuery = StringUtils.toString(queryBuf, 5,
	    					(oldPacketPosition - 5));
	    		}

	    		StringBuffer debugBuf = new StringBuffer(testcaseQuery.length() + 32);
	    		this.connection.generateConnectionCommentBlock(debugBuf);
	    		debugBuf.append(testcaseQuery);
	    		debugBuf.append(';');
	    		this.connection.dumpTestcaseQuery(debugBuf.toString());
	    	}

	    	// Send query command and sql query string
	    	Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket,
	    			false, null, 0);

	    	long fetchBeginTime = 0;
	    	long fetchEndTime = 0;

	    	String profileQueryToLog = null;

	    	boolean queryWasSlow = false;

	    	if (this.profileSql || this.logSlowQueries) {
	    		queryEndTime = getCurrentTimeNanosOrMillis();

	    		boolean shouldExtractQuery = false;

	    		if (this.profileSql) {
	    			shouldExtractQuery = true;
	    		} else if (this.logSlowQueries) {
	    			long queryTime = queryEndTime - queryStartTime;
	    			
	    			boolean logSlow = false;
	    			
	    			if (!this.useAutoSlowLog) {
	    				logSlow = queryTime > this.connection.getSlowQueryThresholdMillis();
	    			} else {
	    				logSlow = this.connection.isAbonormallyLongQuery(queryTime);
	    				
	    				this.connection.reportQueryTime(queryTime);
	    			}
	    			
	    			if (logSlow) {
	    				shouldExtractQuery = true;
	    				queryWasSlow = true;
	    			}
	    		}

	    		if (shouldExtractQuery) {
	    			// Extract the actual query from the network packet
	    			boolean truncated = false;

	    			int extractPosition = oldPacketPosition;

	    			if (oldPacketPosition > this.connection.getMaxQuerySizeToLog()) {
	    				extractPosition = this.connection.getMaxQuerySizeToLog() + 5;
	    				truncated = true;
	    			}

	    			profileQueryToLog = StringUtils.toString(queryBuf, 5,
	    					(extractPosition - 5));

	    			if (truncated) {
	    				profileQueryToLog += Messages.getString("MysqlIO.25"); //$NON-NLS-1$
	    			}
	    		}

	    		fetchBeginTime = queryEndTime;
	    	}

	    	ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType,
	    			resultSetConcurrency, streamResults, catalog, resultPacket,
	    			false, -1L, cachedMetadata);

	    	if (queryWasSlow && !this.serverQueryWasSlow /* don't log slow queries twice */) {
	    		StringBuffer mesgBuf = new StringBuffer(48 +
	    				profileQueryToLog.length());

	    		mesgBuf.append(Messages.getString("MysqlIO.SlowQuery",
	    				new Object[] {String.valueOf(this.useAutoSlowLog ? 
	    						" 95% of all queries " : this.slowQueryThreshold),
	    				queryTimingUnits,
	    				Long.valueOf(queryEndTime - queryStartTime)}));
	    		mesgBuf.append(profileQueryToLog);

	    		ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

	    		eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY,
	    				"", catalog, this.connection.getId(), //$NON-NLS-1$
	    				(callingStatement != null) ? callingStatement.getId() : 999,
	    						((ResultSetImpl)rs).resultId, System.currentTimeMillis(),
	    						(int) (queryEndTime - queryStartTime), queryTimingUnits, null,
	    						LogUtils.findCallingClassAndMethod(new Throwable()), mesgBuf.toString()));

	    		if (this.connection.getExplainSlowQueries()) {
	    			if (oldPacketPosition < MAX_QUERY_SIZE_TO_EXPLAIN) {
	    				explainSlowQuery(queryPacket.getBytes(5,
	    						(oldPacketPosition - 5)), profileQueryToLog);
	    			} else {
	    				this.connection.getLog().logWarn(Messages.getString(
	    						"MysqlIO.28") //$NON-NLS-1$
	    						+MAX_QUERY_SIZE_TO_EXPLAIN +
	    						Messages.getString("MysqlIO.29")); //$NON-NLS-1$
	    			}
	    		}
	    	}

	    	if (this.logSlowQueries) {

	    		ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

	    		if (this.queryBadIndexUsed && this.profileSql) {
	    			eventSink.consumeEvent(new ProfilerEvent(
	    					ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$
	    					this.connection.getId(),
	    					(callingStatement != null) ? callingStatement.getId()
	    							: 999, ((ResultSetImpl)rs).resultId,
	    							System.currentTimeMillis(),
	    							(queryEndTime - queryStartTime), this.queryTimingUnits,
	    							null,
	    							LogUtils.findCallingClassAndMethod(new Throwable()),
	    							Messages.getString("MysqlIO.33") //$NON-NLS-1$
	    							+profileQueryToLog));
	    		}

	    		if (this.queryNoIndexUsed && this.profileSql) {
	    			eventSink.consumeEvent(new ProfilerEvent(
	    					ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$
	    					this.connection.getId(),
	    					(callingStatement != null) ? callingStatement.getId()
	    							: 999, ((ResultSetImpl)rs).resultId,
	    							System.currentTimeMillis(),
	    							(queryEndTime - queryStartTime), this.queryTimingUnits,
	    							null,
	    							LogUtils.findCallingClassAndMethod(new Throwable()),
	    							Messages.getString("MysqlIO.35") //$NON-NLS-1$
	    							+profileQueryToLog));
	    		}
	    		
	    		if (this.serverQueryWasSlow && this.profileSql) {
	    			eventSink.consumeEvent(new ProfilerEvent(
	    					ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$
	    					this.connection.getId(),
	    					(callingStatement != null) ? callingStatement.getId()
	    							: 999, ((ResultSetImpl)rs).resultId,
	    							System.currentTimeMillis(),
	    							(queryEndTime - queryStartTime), this.queryTimingUnits,
	    							null,
	    							LogUtils.findCallingClassAndMethod(new Throwable()),
	    							Messages.getString("MysqlIO.ServerSlowQuery") //$NON-NLS-1$
	    							+profileQueryToLog));
	    		}
	    	}

	    	if (this.profileSql) {
	    		fetchEndTime = getCurrentTimeNanosOrMillis();

	    		ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

	    		eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_QUERY,
	    				"", catalog, this.connection.getId(), //$NON-NLS-1$
	    				(callingStatement != null) ? callingStatement.getId() : 999,
	    						((ResultSetImpl)rs).resultId, System.currentTimeMillis(),
	    						(queryEndTime - queryStartTime), this.queryTimingUnits,
	    						null,
	    						LogUtils.findCallingClassAndMethod(new Throwable()), profileQueryToLog));

	    		eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_FETCH,
	    				"", catalog, this.connection.getId(), //$NON-NLS-1$
	    				(callingStatement != null) ? callingStatement.getId() : 999,
	    						((ResultSetImpl)rs).resultId, System.currentTimeMillis(),
	    						(fetchEndTime - fetchBeginTime), this.queryTimingUnits,
	    						null,
	    						LogUtils.findCallingClassAndMethod(new Throwable()), null));
	    	}

	    	if (this.hadWarnings) {
	    		scanForAndThrowDataTruncation();
	    	}

	    	if (this.statementInterceptors != null) {
	    		ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPost(
	    				query, callingStatement, rs, false, null);

	    		if (interceptedResults != null) {
	    			rs = interceptedResults;
	    		}
	    	}

	    	return rs;
    	} catch (SQLException sqlEx) {
    		if (this.statementInterceptors != null) {
	    		invokeStatementInterceptorsPost(
	    				query, callingStatement, null, false, sqlEx); // we don't do anything with the result set in this case
    		}
    		
    		if (callingStatement != null) {
    			synchronized (callingStatement.cancelTimeoutMutex) {
	    			if (callingStatement.wasCancelled) {
						SQLException cause = null;
						
						if (callingStatement.wasCancelledByTimeout) {
							cause = new MySQLTimeoutException();
						} else {
							cause = new MySQLStatementCancelledException();
						}
						
						callingStatement.resetCancelledState();
						
						throw cause;
					}
    			}
    		}
    		
    		throw sqlEx;
    	} finally {
    		this.statementExecutionDepth--;
    	}
    }

  

重点看到其中的


// Send query command and sql query string
Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket,
false, null, 0);

final Buffer sendCommand(int command, String extraData, Buffer queryPacket,
        boolean skipCheck, String extraDataCharEncoding, int timeoutMillis)
        throws SQLException {
    	this.commandCount++;
    	
        //
        // We cache these locally, per-command, as the checks
        // for them are in very 'hot' sections of the I/O code
        // and we save 10-15% in overall performance by doing this...
        //
        this.enablePacketDebug = this.connection.getEnablePacketDebug();
        this.readPacketSequence = 0;

        int oldTimeout = 0;
        
        if (timeoutMillis != 0) {
        	try {
        		oldTimeout = this.mysqlConnection.getSoTimeout();
				this.mysqlConnection.setSoTimeout(timeoutMillis);
			} catch (SocketException e) {
				throw SQLError.createCommunicationsException(this.connection, lastPacketSentTimeMs, 
						lastPacketReceivedTimeMs, e, getExceptionInterceptor());
			}
        }
        
        try {

            checkForOutstandingStreamingData();

            // Clear serverStatus...this value is guarded by an
            // external mutex, as you can only ever be processing
            // one command at a time
            this.oldServerStatus = this.serverStatus;
            this.serverStatus = 0;
            this.hadWarnings = false;
            this.warningCount = 0;

            this.queryNoIndexUsed = false;
            this.queryBadIndexUsed = false;
            this.serverQueryWasSlow = false;
            
            //
            // Compressed input stream needs cleared at beginning
            // of each command execution...
            //
            if (this.useCompression) {
                int bytesLeft = this.mysqlInput.available();

                if (bytesLeft > 0) {
                    this.mysqlInput.skip(bytesLeft);
                }
            }

            try {
                clearInputStream();

                //
                // PreparedStatements construct their own packets,
                // for efficiency's sake.
                //
                // If this is a generic query, we need to re-use
                // the sending packet.
                //
                if (queryPacket == null) {
                    int packLength = HEADER_LENGTH + COMP_HEADER_LENGTH + 1 +
                        ((extraData != null) ? extraData.length() : 0) + 2;

                    if (this.sendPacket == null) {
                        this.sendPacket = new Buffer(packLength);
                    }

                    this.packetSequence = -1;
                    this.compressedPacketSequence = -1;
                    this.readPacketSequence = 0;
                    this.checkPacketSequence = true;
                    this.sendPacket.clear();

                    this.sendPacket.writeByte((byte) command);

                    if ((command == MysqlDefs.INIT_DB) ||
                            (command == MysqlDefs.CREATE_DB) ||
                            (command == MysqlDefs.DROP_DB) ||
                            (command == MysqlDefs.QUERY) ||
                            (command == MysqlDefs.COM_PREPARE)) {
                        if (extraDataCharEncoding == null) {
                            this.sendPacket.writeStringNoNull(extraData);
                        } else {
                            this.sendPacket.writeStringNoNull(extraData,
                                extraDataCharEncoding,
                                this.connection.getServerCharacterEncoding(),
                                this.connection.parserKnowsUnicode(), this.connection);
                        }
                    } else if (command == MysqlDefs.PROCESS_KILL) {
                        long id = Long.parseLong(extraData);
                        this.sendPacket.writeLong(id);
                    }

                    send(this.sendPacket, this.sendPacket.getPosition());
                } else {
                    this.packetSequence = -1;
                    this.compressedPacketSequence = -1;
                    send(queryPacket, queryPacket.getPosition()); // packet passed by PreparedStatement
                }
            } catch (SQLException sqlEx) {
                // don't wrap SQLExceptions
                throw sqlEx;
            } catch (Exception ex) {
                throw SQLError.createCommunicationsException(this.connection,
                    this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ex, getExceptionInterceptor());
            }

            Buffer returnPacket = null;

            if (!skipCheck) {
                if ((command == MysqlDefs.COM_EXECUTE) ||
                        (command == MysqlDefs.COM_RESET_STMT)) {
                    this.readPacketSequence = 0;
                    this.packetSequenceReset = true;
                }

                returnPacket = checkErrorPacket(command);
            }

            return returnPacket;
        } catch (IOException ioEx) {
            throw SQLError.createCommunicationsException(this.connection,
                this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor());
        } finally {
        	if (timeoutMillis != 0) {
        		try {
        			this.mysqlConnection.setSoTimeout(oldTimeout);
        		} catch (SocketException e) {
    				throw SQLError.createCommunicationsException(this.connection, lastPacketSentTimeMs, 
    						lastPacketReceivedTimeMs, e, getExceptionInterceptor());
    			}
        	}
        }
    }

在上述方法中完成了sql数据的发送以及结果的获取,都是通过Socket的outputstream和inputstream完成的。

private final void send(Buffer packet, int packetLen)
        throws SQLException {
        try {
            if (this.maxAllowedPacket > 0 && packetLen > this.maxAllowedPacket) {
                throw new PacketTooBigException(packetLen, this.maxAllowedPacket);
            }

            if ((this.serverMajorVersion >= 4) &&
            		(packetLen - HEADER_LENGTH >= this.maxThreeBytes ||
            		(this.useCompression && packetLen - HEADER_LENGTH >= this.maxThreeBytes - COMP_HEADER_LENGTH)
            		)) {
                sendSplitPackets(packet, packetLen);

            } else {
                this.packetSequence++;

                Buffer packetToSend = packet;
                packetToSend.setPosition(0);
                packetToSend.writeLongInt(packetLen - HEADER_LENGTH);
                packetToSend.writeByte(this.packetSequence);

                if (this.useCompression) {
                	this.compressedPacketSequence++;
                    int originalPacketLen = packetLen;

                    packetToSend = compressPacket(packetToSend, 0, packetLen);
                    packetLen = packetToSend.getPosition();

                    if (this.traceProtocol) {
                        StringBuffer traceMessageBuf = new StringBuffer();

                        traceMessageBuf.append(Messages.getString("MysqlIO.57")); //$NON-NLS-1$
                        traceMessageBuf.append(getPacketDumpToLog(
                                packetToSend, packetLen));
                        traceMessageBuf.append(Messages.getString("MysqlIO.58")); //$NON-NLS-1$
                        traceMessageBuf.append(getPacketDumpToLog(packet,
                                originalPacketLen));

                        this.connection.getLog().logTrace(traceMessageBuf.toString());
                    }
                } else {

                    if (this.traceProtocol) {
                        StringBuffer traceMessageBuf = new StringBuffer();

                        traceMessageBuf.append(Messages.getString("MysqlIO.59")); //$NON-NLS-1$
                        traceMessageBuf.append("host: '");
                        traceMessageBuf.append(host);
                        traceMessageBuf.append("' threadId: '");
                        traceMessageBuf.append(threadId);
                        traceMessageBuf.append("'
");
                        traceMessageBuf.append(packetToSend.dump(packetLen));

                        this.connection.getLog().logTrace(traceMessageBuf.toString());
                    }
                }

                this.mysqlOutput.write(packetToSend.getByteBuffer(), 0, packetLen);
                this.mysqlOutput.flush();
            }

            if (this.enablePacketDebug) {
                enqueuePacketForDebugging(true, false, packetLen + 5,
                    this.packetHeaderBuf, packet);
            }

            //
            // Don't hold on to large packets
            //
            if (packet == this.sharedSendPacket) {
                reclaimLargeSharedSendPacket();
            }
            
            if (this.connection.getMaintainTimeStats()) {
				this.lastPacketSentTimeMs = System.currentTimeMillis();
			}
        } catch (IOException ioEx) {
            throw SQLError.createCommunicationsException(this.connection,
                this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor());
        }
    }

  

 private Buffer checkErrorPacket(int command) throws SQLException {
        //int statusCode = 0;
        Buffer resultPacket = null;
        this.serverStatus = 0;

        try {
            // Check return value, if we get a java.io.EOFException,
            // the server has gone away. We'll pass it on up the
            // exception chain and let someone higher up decide
            // what to do (barf, reconnect, etc).
            resultPacket = reuseAndReadPacket(this.reusablePacket);
        } catch (SQLException sqlEx) {
            // Don't wrap SQL Exceptions
            throw sqlEx;
        } catch (Exception fallThru) {
            throw SQLError.createCommunicationsException(this.connection,
                this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, fallThru, getExceptionInterceptor());
        }

        checkErrorPacket(resultPacket);

        return resultPacket;
    }

--------------------------------------------------------

在JDBC驱动的sql请求数据发送给mysql服务器的过程中,可以配置是否采用压缩,压缩采用的是java.util.zip当中的

java.util.zip
类 Deflater

此类使用流行的 ZLIB 压缩程序库为通用压缩提供支持。ZLIB 压缩程序库最初是作为 PNG 图形标准的一部分开发的,不受专利的保护。有关该规范的完整描述,请参见 java.util.zip 包描述。 

协议格式:

packetToSend.writeLongInt(packetLen - HEADER_LENGTH);
packetToSend.writeByte(this.packetSequence);

用于处理TCP粘包的问题,从而MySQL服务器将不同的sql请求分离开来。

我们来看一下JDBC驱动程序中的Connection的相关类图

java.sql.Connection---->com.mysql.jdbc.Connection----->com.mysql.jdbc.MySQLConnection----->com.mysql.jdbc.ConnectionImpl---->JDBC

4Connection

原文地址:https://www.cnblogs.com/wuxinliulei/p/5166729.html