coastarica
coastarica
采纳率100%
2016-02-27 12:30 浏览 2.8k
已采纳

如何使用 Delphi RestClient 读取流数据?

1

OANDA网站提供的REST API可以读取账户信息("https://api-fxtrade.oanda.com/v1/accounts),以及提供流报价(streaming,比如"https://stream-fxtrade.oanda.com/v1/prices?accountId=12345&instruments=AUD_CAD%2CAUD_CHF" )。

目前在XE7版本,使用Delphi提供的RestClient组件可以实现读取账户信息。但是,对于服务端的流数据,由于其传输方式是 chunked transfer encoding,按照账户数据读取方式则会在停在最后Excute无法获得返回的Json字符串。

代码如下:

 function GetStreamingPrices:TQJson 
var 
  ....
begin
      Result := TQJson.Create;
            // Get
     _RESTRequest.Method := rmGET;
        //指定资源位置:'/prices'
     _RESTRequest.Resource := TRequestSourceURITemplate.Prices;
         //生成query串:accountId=12345&instruments=AUD_CAD%2CAUD_CHF 
     _RESTRequest.AddParameter(TQueryPairKey.AccountID,accountlist[0],pkGETorPOST);
     _RESTRequest.AddParameter(TQueryPairKey.Instruments,'EUR_USD',pkGETorPOST);
         //URL 
     ._RestClient.BaseURL := 'https://stream-fxtrade.oanda.com/v1';
        //执行
     _RestRequest.Execute;
        //解析为Json
     Result.tryParse(_RESTResponse);
end;

so,请教,如何使用Delphi XE7 的RestClient 获取 chunked transfer encoding 的流数据?请不吝赐教。


多谢一楼 @caozhy 提供的信息,不过没有没有解决此问题。我再描述一下:
1、上面的代码用于正常的request-response过程,是没有问题的。
2、 问题出在,报价流的获取上。报价服务器在工作时段实际是持续传输,一直没有结束,采取chunked transfer encoding模式持续地向外传输当时的即时价格信息。使用上面代码,就会处在一直等待状态下。

因此,需要达到的目标就是:一,不能等到结束标志才去解析收到的字符串。二,即便间歇获取数据,如果中间的缓冲时间过长,也不能忍受。

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

3条回答 默认 最新

  • 已采纳
    coastarica coastarica 2016-03-01 08:38

    用TIdeventstream方法效率更高:

    ResponseEventStrem:TIdEventStream;

    ResponseEventStrem.OnWrite := IdEventStreamWrite;

    //这个自定义的过程替代了stream的write函数。
    procedure  TRatesStreamWorker.IdEventStreamWrite(const ABuffer: TIdBytes; AOffset: Integer; ACount: Integer; var VResult: Integer);
    

    var
    AJson : TQJson;
    begin
    AJson := TQJson.Create;
    AJson.TryParse(BytesToString(ABuffer,Aoffset,ACount,enUTF8)); //buffer是收到的http的content
    if AJson.Count >0 then Jsons.Add(AJson);
    VResult := ACount; //已经写入流的字节数,这里直接返回为count
    end;

    点赞 评论 复制链接分享
  • caozhy 从今以后生命中的每一秒都属于我爱的人 2016-02-27 22:01
  • coastarica coastarica 2016-03-01 02:35

    花了一点事件搜索资料分析,解决了这个问题
    1、关于chunked encoding transfer 模式:数据包虽然没有contentlength,但是有数据包长度信息的,位置在content的最前面。数据包的格式的详情chunked encoding transfer的包格式

    2、在stackoverflow网站,有一篇2013年的贴子,也提到了相似的需求。有的建议是用从socket做起,作者最终采取了修改idhttp的iohandler中的readbytes和readstream两个函数,并使用继承封装自己的类来实现。

    3、我跟踪idhttp实现发现,indy在处理chunked encoding 数据包的处理是每个包读取,并放入stream中,这个过程是没有问题的,并不存在readbyte函数要缓冲所有的数据包之后才送出给stream的问题,因此上面修改indy函数是没有必要的。

    4、问题出在,对于报价流类型的数据,虽然每个数据包有长度,但是总长度是无限的,而indy在取数据时 ,总是要等待整个数据包全部传输完成,也就是最后的0长度的chunked包作为结束的标志,才会返回。

    5、在RESTClient组件上,理论上RestReponse组件应该有一个Tstream类型的变量,用于存储从RestClient得到的网络包。不过我没有找到,因此采取indyhttp 组件来实现对chunked encoding transfer的读取。

    6、实现的逻辑是,正常获取idhttp的流数据,通过idhttp的onwork事件触发读取操作,从stream中取出刚刚收到的数据包。stream中的数据读取完成后清空,idhttp继续向其填充数据,因此无需担心无限长度情况下的溢出。

    7、这篇文章提到了indhttp接受无限长度流数据的更多细节。解决方法看起来更好: 使用定制write方法的Tstream。或者使用TIdstream,这个类型自带了一个onwrite event可以用于输出。

    我的代码如下

    //在idhttp的组件event上,添加事件关联。每次数据包进来这个事件就会触发
    IdHTTP.OnWork :=  IdHTTPWork;
    
    
    
    rocedure  TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    var
      s:string;
      AJson :TQJson;
    begin
         AJson := TQJson.Create;//用于解析包内容的对象
         ResponseStringStream.Position :=0;
         s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;
         ResponseStringStream.Clear;
         AJson.TryParse(s);
         if AJson.Count >0 then jsons.Add(Ajson);
    end;
    
    
    procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject);
    var
     AJson :TQJson;
     begin
       AJson := TQJson.Create;
      //这是OTL并行组件提供的函数,用于异步开一个新的Thread,在后台持续读取价格流
      Parallel.async(
        procedure (const task: IOmniTask)
        var
          source:string;
        begin
          source := RatesWorker.RatesURL+'EUR_USD';  //资源请求字符串
          RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream);  //请求数据
        end);
    
       Parallel.Async( //再开一个新的thread用于从stream取数据,避免阻塞主线程。循环方式比较笨,测试先这样了,以后改成消息触发
       procedure
       begin
           while true do
            begin
               if RatesWorker.Jsons.Count >0 then
                 begin
                   AJson:= RatesWorker.Jsons.Extract(RatesWorker.Jsons.First); //取一条记录
                   Memo1.Lines.Add(AJson.ToString);  //显示到memo。子线程访问主线程不妥,以后再改了
               end;
             end;
         end);
    
       end;
    .....
    `
    
    
    点赞 评论 复制链接分享

相关推荐