关于JdbcRDD实例化mapRow参数

今天碰到一个问题,需要通过JDBC链接数据库,使用Spark读取并处理数据,想到使用JdbcRDD,JdbcRDD的类定义如下

private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
  override def index = idx
}
// TODO: Expose a jdbcRDD function in SparkContext and mark this as semi-private
/**
 * An RDD that executes an SQL query on a JDBC connection and reads results.
 * For usage example, see test case JdbcRDDSuite.
 *
 * @param getConnection a function that returns an open Connection.
 *   The RDD takes care of closing the connection.
 * @param sql the text of the query.
 *   The query must contain two ? placeholders for parameters used to partition the results.
 *   E.g. "select title, author from books where ? <= id and id <= ?"
 * @param lowerBound the minimum value of the first placeholder
 * @param upperBound the maximum value of the second placeholder
 *   The lower and upper bounds are inclusive.
 * @param numPartitions the number of partitions.
 *   Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
 *   the query would be executed twice, once with (1, 10) and once with (11, 20)
 * @param mapRow a function from a ResultSet to a single row of the desired result type(s).
 *   This should only call getInt, getString, etc; the RDD takes care of calling next.
 *   The default maps a ResultSet to an array of Object.
 */
class JdbcRDD[T: ClassTag](
    sc: SparkContext,
    getConnection: () => Connection,
    sql: String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
  extends RDD[T](sc, Nil) with Logging

其中,主要的问题在于mapRow参数的输入,参看注释:“mapRow a function from a ResultSet to a single row of the desired result type(s). This should only call getInt, getString, etc; the RDD takes care of calling next. The default maps a ResultSet to an array of Object. ”mapRow是一个方法,以ResultSet为输入参数,输出是想要得到的类型的单独一行。只能调用getInt、getString等方法,RDD负责调用next。默认将ResultSet映射到一个对象数组。

那么首先看下默认的参数:JdbcRDD.resultSetToObjectArray _,将方法转成偏函数作为参数

def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
    Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
  }

如果默认参数转换成对象数组,则JdbcRDD.saveAsTextFile序列化结果,将在文件中存储为Java对象名称,而非真实结果。如果用for函数yield输出,再存储saveAsTextFile,则每个结果都被Vector包裹。可以看出,所有的结果输出都是以Java对象的形式存储。另外,还有其他一些例子的用法,诸如“r => (r.getString(6),r.getString(11))”单独标注列号等,都无法满足以字符串形式输出所有字段的要求为了改变这一状况,决定自己编写一个mapRow函数。

def maprow(rs: ResultSet): String = {
    @volatile var res: String = ""
    @volatile var i = 1
    val cc = rs.getMetaData.getColumnCount
    while (i < cc) {
        if (i != 1) res = res +", " + rs.getString(i) else res = rs.getString(i)
        i = i + 1
    }
    res
}

使用了while函数,连接每次getString得到的字符串,返回。

关于JdbcRDD的使用可以参考:

http://www.cnblogs.com/yuananyun/p/4281597.html

http://blog.csdn.net/book_mmicky/article/details/38066067

END

原文地址:https://www.cnblogs.com/kevingu/p/4680022.html