使用foreachPartition的时候发现,除println能正常打印外,其他的代码都没有执行,请大家们分析一下
foreachPartition为什么不执行其他代码呢?代码如下:
data.foreachPartition(row=>{
LogUtils.log.info("["+label+"]=" +"["+TaskContext.getPartitionId+"]")
LogUtils.print("["+label+"]=" +"["+TaskContext.getPartitionId+"]")
val mysqlUtils = new MysqlUtils(mysqlUrl, mysqlUser, mysqlPwd)
mysqlUtils.connect()
mysqlUtils.prePareSql(label,schema)
var count = 0
try{
row.foreach(
line=>{
LogUtils.print("["+label+"]=" +"["+TaskContext.getPartitionId+"]"+line.get(0))
LogUtils.log.info("["+label+"]=" +"["+TaskContext.getPartitionId+"]"+line.get(0))
mysqlUtils.saveWideTable(label,schema,line)
count = count+1
}
)
mysqlUtils.saveRptTable(label,count,TaskContext.getPartitionId)
}catch {
case e: Exception => {
e.printStackTrace()
mysqlUtils.rollback()
LogUtils.log.error(e.getMessage)
}
} finally {
mysqlUtils.commit()
mysqlUtils.disConnect()
}
})