我不想名字重复 2021-12-02 14:29 采纳率: 0%
浏览 8
已结题

akka通信问题,来人吗。。

刚接触akka,目前遇到个问题,例如A,B两公司,现在A调用B公司提供的接口,接口中具体做的事是查询一个hdfs目录,然后将读取文件夹下需要的文件(有多个输入流inputStream),现在B如何才能通过akka将这多个文件流一起返回?如果可以,麻烦给个代码比较全的案例(我是用scala写的代码),我的部分代码如下:
//接口部分代码如下
val conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://ip:9000")
val fs: FileSystem = FileSystem.get(conf);
//读取FlowOutportWriter_1文件夹下的一个文件
var inputPath = new Path("/user/dataCenter/FlowOutportWriter_1")
//获取到FlowOutportWriter_1文件夹下的文件状态
val fileStatus: Array[FileStatus] = fs.listStatus(inputPath)
var break = new Breaks
//要返回的文件流
val inputStreamlist: ListBuffer[FSDataInputStream] = ListBuffer[FSDataInputStream]()
//遍历
for (elem <- fileStatus) {
    break.breakable({
    //过滤掉文件不是以part开头的
   if (!elem.getPath.getName.startsWith("part-")) {
       break.break();
   }
   //获取到此文件的输入流,放到要返回的集合中
   val input: FSDataInputStream = fs.open(elem.getPath)
   inputStreamlist += input;
  }
//构建HttpEntity(ContentTypes,Source[ByteString,_])的Source参数
import scala.io.{Source=>IOSource}
val fileContentsSource2: (ListBuffer[FSDataInputStream], String) => Source[ByteString,_] =
      (fileName, enc) => {
        Source
          //这里具体啥意思,其实也不太明白,都是按网上照葫芦画瓢。。
          .fromIterator(IOSource.fromInputStream(fileName.iterator.next(), enc).getLines)
          .map(ByteString.apply(_))
      }

val chunked: Chunked = HttpEntity(ContentTypes.`application/octet-stream`,fileContentsSource2(inputStreamlist,"UTF-8"))
Future.successful(HttpResponse(200,entity=chunked))

接收返回值的代码:

import akka.http.scaladsl.model.{HttpRequest=>akkaHttpRequest}
implicit val system = ActorSystem();
implicit val ex = system.dispatcher;

val response = Http().singleRequest( akkaHttpRequest(HttpMethods.POST,uri="接口地址",entity=传的参数) )
//这都是参考网上写的,有两个问题,为何每次都是"------""11111"先打印?还有runForeach没打印出东西
//而且我都是用得Test测试类调用的,但Test貌似一直没结束。。
response.andThen{
  case Success( HttpResponse(StatusCode.OK,_,res_) )=>{
    println("1111111")
    res.dataBytes.map(_.utf8String).runForeach(println)
  }
}
println("------------------------")

运行结果及报错内容
接下来我该如何构建akka.Response(HttpEntity),因为HttpEntity中的伴生对象没有提供对应的apply方法,但我这几天百度发现可以构建HttpEntity(ContentTypes,Source[ByteString,_]),又遇到两个问题:1.不知道到底返回没有;2.返回后该如何解析,谢谢;能给点具体代码实现吗
我想要达到的结果
  • 写回答

0条回答 默认 最新

    报告相同问题?

    问题事件

    • 系统已结题 12月10日
    • 创建了问题 12月2日

    悬赏问题

    • ¥15 CCF-CSP 2023 第三题 解压缩(50%)
    • ¥30 comfyui openpose报错
    • ¥20 Wpf Datarid单元格闪烁效果的实现
    • ¥15 图像分割、图像边缘提取
    • ¥15 sqlserver执行存储过程报错
    • ¥100 nuxt、uniapp、ruoyi-vue 相关发布问题
    • ¥15 浮窗和全屏应用同时存在,全屏应用输入法无法弹出
    • ¥100 matlab2009 32位一直初始化
    • ¥15 Expected type 'str | PathLike[str]…… bytes' instead
    • ¥15 三极管电路求解,已知电阻电压和三级关放大倍数