问题是:
按照如下触发式停止方法,在运行一段时间,比如一天后,程序不能停止。但是在IDEA测试时可以实现停止,在运行时间不长比如一两个小时后也可以停止。这是为什么。
def stopByMarkFile(streamContext: StreamingContext, log: Logger) = {
val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
var isStop = false
val hdfs_file_path = "hdfs://0.0.01:9000/lserver/stop" //判断消息文件是否存在,如果存在就停止
while (!isStop) {
isStop = streamContext.awaitTerminationOrTimeout(intervalMills)
if (!isStop && isExistsMarkFile(hdfs_file_path)) {
log.warn("2秒后开始关闭sparstreaming程序.....")
Thread.sleep(2000)
streamContext.stop(true, true)
}
}
}
// 指定目录是否存在文件
def isExistsMarkFile(hdfs_file_path:String):Boolean={
val conf = new Configuration()
val path = new Path(hdfs_file_path)
val fs = path.getFileSystem(conf)
return fs.exists(path)
}