刚接触akka,目前遇到个问题,例如A,B两公司,现在A调用B公司提供的接口,接口中具体做的事是查询一个hdfs目录,然后将读取文件夹下需要的文件(有多个输入流inputStream),现在B如何才能通过akka将这多个文件流一起返回?如果可以,麻烦给个代码比较全的案例(我是用scala写的代码),我的部分代码如下:
//接口部分代码如下
val conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://ip:9000")
val fs: FileSystem = FileSystem.get(conf);
//读取FlowOutportWriter_1文件夹下的一个文件
var inputPath = new Path("/user/dataCenter/FlowOutportWriter_1")
//获取到FlowOutportWriter_1文件夹下的文件状态
val fileStatus: Array[FileStatus] = fs.listStatus(inputPath)
var break = new Breaks
//要返回的文件流
val inputStreamlist: ListBuffer[FSDataInputStream] = ListBuffer[FSDataInputStream]()
//遍历
for (elem <- fileStatus) {
break.breakable({
//过滤掉文件不是以part开头的
if (!elem.getPath.getName.startsWith("part-")) {
break.break();
}
//获取到此文件的输入流,放到要返回的集合中
val input: FSDataInputStream = fs.open(elem.getPath)
inputStreamlist += input;
}
//构建HttpEntity(ContentTypes,Source[ByteString,_])的Source参数
import scala.io.{Source=>IOSource}
val fileContentsSource2: (ListBuffer[FSDataInputStream], String) => Source[ByteString,_] =
(fileName, enc) => {
Source
//这里具体啥意思,其实也不太明白,都是按网上照葫芦画瓢。。
.fromIterator(IOSource.fromInputStream(fileName.iterator.next(), enc).getLines)
.map(ByteString.apply(_))
}
val chunked: Chunked = HttpEntity(ContentTypes.`application/octet-stream`,fileContentsSource2(inputStreamlist,"UTF-8"))
Future.successful(HttpResponse(200,entity=chunked))
接收返回值的代码:
import akka.http.scaladsl.model.{HttpRequest=>akkaHttpRequest}
implicit val system = ActorSystem();
implicit val ex = system.dispatcher;
val response = Http().singleRequest( akkaHttpRequest(HttpMethods.POST,uri="接口地址",entity=传的参数) )
//这都是参考网上写的,有两个问题,为何每次都是"------"比"11111"先打印?还有runForeach没打印出东西
//而且我都是用得Test测试类调用的,但Test貌似一直没结束。。
response.andThen{
case Success( HttpResponse(StatusCode.OK,_,res_) )=>{
println("1111111")
res.dataBytes.map(_.utf8String).runForeach(println)
}
}
println("------------------------")