测试环境为:scala 2.11.8和spark2.1.0
单独jar包中TextProcess.scala中的代码:
object TextProcess {
def process(col: String, df: Dataset[Row]) = {
val rdd = df.select(col).rdd.map { x =>
val word = x.getAs[String](0).split(".")(0).toString()
(word, 1)
}
val spark = SparkSession.builder().getOrCreate()
val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = spark.sparkContext.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)
wordCountsWithReduce.foreach { x => println(x._1 , x._2) }
val rd = rdd.reduceByKey(_ + _).map { x => Row.fromSeq(Array(x._1, x._2)) }
val schema = StructType(Array(StructField("word", StringType, false), StructField("num", IntegerType, false)))
spark.createDataFrame(rd, schema)
}
}
测试调用代码:
object ProcessTest {
def main(args: Array[String]) = {
val spark = SparkSession.builder().master("local").getOrCreate()
val data = Array("a.", "b.", "b.", "c.", "b.", "d.", "b.", "d.", "b.aa")
val rdd = spark.sparkContext.parallelize(data, 1).map { x => Row.fromSeq(Seq(x)) }
val schema = StructType(Array(StructField("text", StringType, false)))
val df = spark.createDataFrame(rdd, schema)
df.show()
invoke(df)
}
def invoke(df: Dataset[Row]):Unit = {
val f = new File("F:\\360CloudUI\\textprocess.jar")
val loader = new URLClassLoader(Array(f.toURI().toURL()), getClass.getClassLoader)
val clazz = loader.loadClass("cc.eabour.spark.TextProcess")
val process = clazz.getDeclaredMethod("process", classOf[String], classOf[Dataset[_]])
process.invoke(null, "text", df)
}
}
错误信息堆栈:
7/09/24 18:43:08 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 5981 bytes)
17/09/24 18:43:08 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
17/09/24 18:43:08 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.ClassNotFoundException: cc.eabour.spark.TextProcess$$anonfun$4
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1593)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
17/09/24 18:43:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.ClassNotFoundException: cc.eabour.spark.TextProcess$$anonfun$4
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1593)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
17/09/24 18:43:08 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
17/09/24 18:43:08 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/09/24 18:43:08 INFO TaskSchedulerImpl: Cancelling stage 1
不知道是spark的使用问题还是scala的调用问题。求指导