Spark-Scala单元测试实践

单元测试框架搭建

使用工具与框架如下:

  • scala-test_2.1.1 (3.0.0) - 基本框架

  • mockito-scala_2.11 (1.16.37) - MOCK框架

  • spark-fast-tests_2.11 (0.23.0) - 断言(比较DataFrame)

  • scalatest-maven-plugin - maven插件, 可运行test

POM文件示例

</dependencies>
	<dependency>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest_2.11</artifactId>
        <version>3.0.0</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.github.mrpowers</groupId>
        <artifactId>spark-fast-tests_2.11</artifactId>
        <version>0.23.0</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.mockito</groupId>
        <artifactId>mockito-scala_2.11</artifactId>
        <version>1.16.37</version>
        <scope>test</scope>
    </dependency>
</dependencies>
<build>
	</plugins>
		<plugin>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest-maven-plugin</artifactId>
            <version>1.0</version>
            <configuration>
                <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
                <junitxml>.</junitxml>
                <filereports>WDF TestSuite.txt</filereports>
            </configuration>
            <executions>
                <execution>
                    <id>test</id>
                    <goals>
                        <goal>test</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
    <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
</build>

虽然需要的jar包maven已经全部配置了, 但是打包的单元测试过程, Spark初始化报错。 如下

org.apache.maven.surefire.util.SurefireReflectionException: java.lang.reflect.InvocationTargetException; nested exception is java.lang.reflect.InvocationTargetException: null
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
        at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
        at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/Module
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
        at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:200)
        at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:196)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
        at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:196)
        at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:104)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:514)
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.Module
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

原因是jar包产生了冲突, 需要显示引入jar包, pom文件新增依赖如下:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.6.7</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.6.7</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>2.6.7</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-scala_2.11</artifactId>
    <version>2.6.7.1</version>
</dependency>

Scala-test提供了七种测试风格,分别为:FunSuite,FlatSpec,FunSpec,WordSpec,FreeSpec,PropSpec和FeatureSpec。

项目中选择了FunSuite这种风格, 更加灵活以及符合传统测试方法的风格。

示例

测试下面类的一个UDF

class Clue(spark: SparkSession, ds: String) extends TDW with Serializable {
	// 去除标点符号和空格, 仅保留中文英文数字
    val removeSymbolUdf: UserDefinedFunction = udf((title: String) => {
        val pattern = "[\u4e00-\u9fa5a-zA-Z0-9]+".r
        val matchess = pattern.findAllMatchIn(title)
        val buffer = new StringBuilder()
        matchess.foreach(matches => buffer.append(matches.group(0)))
        buffer.toString()
    })
}

Spark测试初始化的类

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession
      .builder()
      .master("local[*]")
      .appName("spark test example")
      .config("spark.driver.bindAddress","127.0.0.1")
      .getOrCreate()
  }

}

单元测试类。

class ClueTest extends FunSuite with SparkSessionTestWrapper with MockitoSugar 
               with DataFrameComparer with Serializable {
	private val obj = new Clue(null, "20210806") {
        override def save(spark: SparkSession, table: String, df: DataFrame, impDate: String): Unit = {}
    }
    test("testRemoveSymbolUdf") {
        import spark.implicits._

        val sourceDF = Seq(
            "1234这里this是is一个a测%试test案$例case     ",
            "这里是。.,;*&?*;&……一个1234测试案例"
        ).toDF("title")

        val actualDF = sourceDF.withColumn("title", obj.removeSymbolUdf(col("title")))

        val expectedDF = Seq(
            "1234这里this是is一个a测试test案例case",
            "这里是一个1234测试案例"
        ).toDF("title")

        // assertSmallDataFrameEquality是包spark-fast-tests下DataFrameComparer的断言方法, 可用于比较DataFrame
        assertSmallDataFrameEquality(actualDF, expectedDF)
    }
}

运行结果

image-20210821142836476

上面没有报错就是运行成功了。

maven打包时, 会运行所有测试, 如下:

image-20210821145003135

MOCK框架

mock框架是使用的mockito-scala。

部分MOCK

mock[Class]会将一个class的所有方法全部mock掉, 返回mockito的各类型的默认值。如果要mock所有方法, 然后在少量方法里调用真实的方法实现, 可以这么写:

class TestMock {
	def doSomethingA(): String = "doSomethingA not mocked"
    def doSomethingB(): String = "doSomethingB not mocked"	
}

class TestMockSuite1 extends FunSuite with MockitoSugar {
    test("testMock") {
        val m = mock[TestMock]
        doCallRealMethod.when(m).doSomethingA()
        println(m.doSomethingA()) // 打印 doSomethingA not mocked
        println(m.doSomethingB()) // 打印空字符串
    }
}

还有一种Spy方法, 对于没有mock的部分调用真实的方法, 被mock部分调用mock后的方法

class TestMock {
	def doSomethingA(): String = "doSomethingA not mocked"
    def doSomethingB(): String = "doSomethingB not mocked"	
}

class TestMockSuite2 extends FunSuite with MockitoSugar {
    test("testMock") {
        val m = spy(new TestMock())
        println(m.doSomethingA()) // 打印 doSomethingA not mocked
        println(m.doSomethingB()) // 打印 doSomethingB not mocked
        when(m.doSomethingB()).thenReturn("mocked") // 这种方式实际上还是会调用真实的方法, 只是偷换了返回值
        // doReturn("mocked!").when(m).doSomethingB() //这种写法才能避免真实方法的调用
        println(m.doSomethingA()) // 打印 doSomethingA not mocked
        println(m.doSomethingB()) // 打印 mocked
        
    }
}

Mock Class Methods Extended Trait

对于这种类型的MOCK不能直接mock trait, 一种可行的方式生成一个子类, 将方法重写。

示例

trait A {
	def doSomethingA(): String = "not mocked, invoke trait A.doSomethingA"
}

class B extends A {
    
}

如果采用下面的方式mock, 是无法mock成功的。

val a = mock[A]
when(a.doSomethingA()).thenReturn("mocked")
val b = new B()
println(b.doSomethingA()) // 打印值: not mocked, invoke trait A.doSomethingA

正确的mock方法是

val obj = new B {
    override def doSomethingA(): String = "mocked"
}
val t = Mockito.mock(obj.getClass, Mockito.withSettings()
                     .spiedInstance(obj).defaultAnswer(CALLS_REAL_METHODS))

Mock Scala Object

Object支持inline mock。需要这个被mock的object的测试代码只能写在withObjectMocked[Class]的大括号里面。如下

class D {
    def value(): String = "D_value"
}
class DMock extends D {
    override def value(): String = "mocked"
}
objetct D {
	def apply(): D = new D()
}
withObjectMocked[D.type] {
    when(D.apply()).thenReturn(new DMock())
    // 测试代码
}

当MOCK一个Object的时候, 会报下面一个错误。

org.mockito.exceptions.base.MockitoException: 
Cannot mock/spy class com.pkslow.basic.MockitoTest$FinalPumpkin
Mockito cannot mock/spy because :
 - final class

quote from: How to mock scala obejct #303 https://github.com/mockito/mockito-scala/issues/303

https://github.com/mockito/mockito/wiki/What's-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods

解决这个问题的方法如下图, 在src/test/resources/mockito-extensions下新增一个文件org.mockito.plugins.MockMaker, 内容为mock-maker-inline

image-20210823144400362

但是打包的时候会出现

Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: Could not initialize plugin: interface org.mockito.plugins.MockMaker
	at com.dummy.feed.query.TestUtils.getRequestData(TestUtils.java:879)
	at com.dummy.feed.query.resultsetfilters.TestResultSetFiltersManager.<init>(TestResultSetFiltersManager.java:63)
	... 53 more
Caused by: java.lang.IllegalStateException: Could not initialize plugin: interface org.mockito.plugins.MockMaker
	at org.mockito.internal.configuration.plugins.PluginLoader$1.invoke(PluginLoader.java:66)
	at com.sun.proxy.$Proxy10.isTypeMockable(Unknown Source)
	at org.mockito.internal.util.MockUtil.typeMockabilityOf(MockUtil.java:29)
	at org.mockito.internal.util.MockCreationValidator.validateType(MockCreationValidator.java:22)
	at org.mockito.internal.creation.MockSettingsImpl.validatedSettings(MockSettingsImpl.java:186)
	at org.mockito.internal.creation.MockSettingsImpl.confirm(MockSettingsImpl.java:180)
	at org.mockito.internal.MockitoCore.mock(MockitoCore.java:62)
	at org.mockito.Mockito.mock(Mockito.java:1729)
	at org.mockito.Mockito.mock(Mockito.java:1642)
...
Caused by: java.lang.IllegalStateException: Error during attachment using: net.bytebuddy.agent.ByteBuddyAgent$AttachmentProvider$Compound@a5092e63
	at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:379)
	at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:353)
	at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:321)
	at net.bytebuddy.agent.ByteBuddyAgent.install(ByteBuddyAgent.java:307)
	at org.mockito.internal.creation.bytebuddy.InlineByteBuddyMockMaker.<clinit>(InlineByteBuddyMockMaker.java:102)

quote from: https://stackoverflow.com/questions/49767683/what-does-no-compatible-attachment-provider-is-available-mean

After some debugging I found the problem. The message "No compatible attachment provider is not available" occurs if the agent was called with a jre instead of a jdk.

Unfortunately calling java -version does not return whether java is a jdk or a jre (the message is displaying Java runtime Environment for both).

In my case (OS:Windows) it was tricky, because newer jsdk-installations attach C:ProgramDataOracleJavajavapath to the system path, which contains a jre and not a jdk. The formerly added entry %JAVA_HOME%/bin got hidden through this modification. When I removed the entry C:ProgramDataOracleJavajavapath everything worked fine.

ByteBuddyAgent.install方法在JDK的去调用的时候是没有问题的。但是当JRE去调用它的时候就会出现这个问题。现在新版的JDK安装会安装JRE, 优先级高于JDK自身的JRE。所以解决办法是卸载JRE。(JDK安装的时候是安装了两个JRE的, 一个JDK里面的, 一个公共的JRE, 卸载掉公共的JRE是没有问题的)

MOCK new 对象

如果要mock下面的代码的方法里的new出来的对象

class D {
    def value(): String = "D_value"
}
class TestMock {
    def testNewObject(): String = new D().value()
}
class TestMockSuiteD extends FunSuite with MockitoSugar {
    test("testMock") {
        val d = mock[D]
        when(d.value()).thenReturn("mocked") // 这里mock是不会对方法调用里面的new对象起作用的
        val m = spy(new TestMock())
        println(m.testNewObject()) // 打印 D_value
    }
}

new对象的问题在ScalaTest + mockito-scala下无解。没有可以解决的方法, 在一门面向对象的编程语言里非常之坑。在mock new对象的时候, 可以将new对象的操作放在工厂方法里, 这样就能通过mock object来mock new对象的问题了。

class D {
    def value(): String = "D_value"
}
class DMock extends D {
    override def value(): String = "mocked"
}
objetct D {
	def apply(): D = new D()
}
class TestMock {
    def testNewObject(): String = D().value()
}
class TestMockSuiteD extends FunSuite with MockitoSugar {
    test("testMock") {
        withObjectMocked[D.type] {
            when(D.apply()).thenReturn(new DMock())
            val m = spy(new TestMock())
            println(m.testNewObject()) // 打印mocked
        }

    }
}

序列化问题

在写单元测试的时候, 会经常遇到

Task not serializable
org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
...
Caused by: java.io.NotSerializableException: org.mockito.internal.creation.DelegatingMethod
Serialization stack:
	- object not serializable (class: org.mockito.internal.creation.DelegatingMethod, value: org.mockito.internal.creation.DelegatingMethod@a97f2bff)
	- field (class: org.mockito.internal.invocation.ScalaInvocation, name: mockitoMethod, type: interface org.mockito.internal.invocation.MockitoMethod)
	- object (class org.mockito.internal.invocation.ScalaInvocation, clue.toString();)
	- field (class: org.mockito.internal.invocation.InvocationMatcher, name: invocation, type: interface org.mockito.invocation.Invocation)
	- object (class org.mockito.internal.invocation.InvocationMatcher, clue.toString();)

提示不可序列化。

我们自己创建的类是已经实现了Serializable接口的。那为什么还会出现序列化问题呢?是因为spy(new A())的原因。spy之后的对象是mockito帮我们创建的, 我们mock的它里面调用了一些不能序列化的东西(比如上面报错的DelegatingMethod)。导致出现了这种错误。

对于一个不熟悉mockito源码的使用者来说, 第一解决方案是Google。

quote from: https://stackoverflow.com/questions/53820877/spark-scala-mocking-task-not-serializable

Mocks are not serialisable by default, as it's usually a code smell in unit testing

You can try enabling serialisation by creating the mock like mock[MyType](Mockito.withSettings().serializable()) and see what happens when spark tries to use it.

BTW, I recommend you to use mockito-scala instead of the traditional mockito as it may save you some other problems

Spying in mockito-scala works exactly the same as regular mockito... What you can try is a mock with delegate witch is how spyLambda works, i.e. Mockito.mock(classOf[MyType], Mockito.withSettings().serializable().defaultAnswer(AdditionalAnswers.delegatesTo(new MyScalaClass(<some args>))))

上面的第一个选项是mock并不是spy, 并不能满足需求。于是照着上面的第二个方案修改为

val t = Mockito.mock(classOf[Clue], Mockito.withSettings().serializable().defaultAnswer(AdditionalAnswers.delegatesTo(new Clue())))

发现spy对象mock的方法直接调用了原方法, 并不能mock住。

没有办法继续Google...

quote from: https://github.com/mockito/mockito/issues/537

I think that if the subject of spy or mock method implements Serializable, then Mockito should return serializable instance with no exceptions and current behavior should be treated as a bug!

While this:

mock(class,
withSettings()
.serializable()
.spiedInstance(object)
.defaultAnswer(CALLS_REAL_METHODS));

indeed works, it should be treated only as a workaround until the framework fixes it's behavior.

照着这个选项进行了如下的修改

val t = Mockito.mock(classOf[Clue], Mockito.withSettings().serializable()
          .spiedInstance(new Clue()).defaultAnswer(CALLS_REAL_METHODS))

当然CALLS_REAL_METHODS是飘红的, 于是尝试着自己写一个Answer。不过最后还是报错。

最后忽然意思灵光想到CALLS_REAL_METHODS这个应该是Mockito里自带有的,于是下了Mockito源码来看。在里面找到了import的语句。

import org.mockito.Answers.CALLS_REAL_METHODS

加上这一行过后。不飘红了, 能够正确导入了。但是又报了一个新的错误。

2021-08-07 00:07:30,232 - Exception in task 6.0 in stage 17.0 (TID 1437)
java.io.InvalidClassException: java.lang.Void; local class name incompatible with stream class name "void"
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:703)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1939)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1805)
	at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1770)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1595)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)

继续使用Google大法。。。。

quote from: https://stackoverflow.com/questions/56579623/mock-with-void-method-causes-local-class-name-incompatible-with-stream-class-na

The problem seems to be in the way that Mockito serializes its internal proxy classes. That only has a negative effect if the tasks / jobs you run within spark actually get serialized and deserialized.

In org.apache.spark.scheduler.ShuffleMapTask#runTask the task is deserialized. What Spark basically does at that point is:

new JavaDeserializationStream(new ByteBufferInputStream(ByteBuffer.wrap(this.taskBinary.value())), ClassLoader.getSystemClassLoader()).objIn.readObject()

which produces the exact error message vs.

new ObjectInputStream(new ByteArrayInputStream(this.taskBinary.value())).readObject()

which would work and parse the object properly.

In particular there seems to be a mismatch between how Java / Spark expects void methods to be serialized vs. what Mockito actually does: "java.lang.Void" / "Void" vs. "void".

Luckily Mockito lets you specify the way it serializes its mocks:

MockSettings mockSettings = Mockito.withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS);
writer = mock(Writer.class, mockSettings);

After this change the test should work.

修改为

val t = Mockito.mock(classOf[Clue], Mockito.withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS)
          .spiedInstance(obj).defaultAnswer(CALLS_REAL_METHODS))

终于跑通了。

参考

https://medium.com/@bruno.bonanno/introduction-to-mockito-scala-ede30769cbda
https://mrpowers.medium.com/testing-spark-applications-8c590d3215fa
http://agiledon.github.io/blog/2014/01/13/testing-styles-of-scalatest/
https://stackoverflow.com/questions/32109910/how-to-mock-method-in-extended-trait-with-mockito

原文地址:https://www.cnblogs.com/chenrj97/p/15467858.html