精通 Filecoin:Lotus真实数据处理之Provider处理存储

上篇,当 Client 接收到用户的存储交易,创建一个 /fil/storage/mk/1.0.1 协议的流,然后通过流发送存储交易。处理这个协议的正是 HandleDealStream 方法。这个方法直接调用自身的 receiveDeal 方法进行处理。receiveDeal 方法处理如下:

  1. 从流中读取存储提案 Proposal 对象。
    proposal, err := s.ReadDealProposal()
    

    这里的流对象是 dealStream 对象(storagemarket/network/deal_stream.go),这个对象对原始流对象进行了封装。

  2. 获取 ipld node 对象。
    proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
  3. 生成矿工交易对象。
    deal := &storagemarket.MinerDeal{    Client:             s.RemotePeer(),    Miner:              p.net.ID(),    ClientDealProposal: *proposal.DealProposal,    ProposalCid:        proposalNd.Cid(),    State:              storagemarket.StorageDealUnknown,    Ref:                proposal.Piece,}
  4. 调用 fsm 状态组的 Begin 的方法,生成一个状态机,并开始跟踪矿工交易对象。
    err = p.deals.Begin(proposalNd.Cid(), deal)
  5. 保存流对象到连接管理器中。
    err = p.conns.AddStream(proposalNd.Cid(), s)
  6. 发送事件到 fsm 状态组,从而开始对交易对象进行处理。
    return p.deals.Send(proposalNd.Cid(), storagemarket.ProviderEventOpen)

    当处理机收到 ProviderEventOpen 状态事件时,因为初始状态为默认值 0,即 StorageDealUnknown,事件处理器对象经过内部处理找到对应的目的状态为 StorageDealValidating,从而调用其处理函数 ValidateDealProposal 函数进行处理。



1、`ValidateDealProposal` 函数

这个函数用来验证交易提案对象。

  1. 调用 Lotus Provider 适配器对象的 GetChainHead 方法,获取区块链顶部 tipset key 和其高度。
    tok, height, err := environment.Node().GetChainHead(ctx.Context())

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting most recent state id: %w", err))}

  2. 验证客户发送的交易提案对象。如果验证不通过,则发送拒绝事件。
    if err := providerutils.VerifyProposal(ctx.Context(), deal.ClientDealProposal, tok, environment.Node().VerifySignature); err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("verifying StorageDealProposal: %w", err))}
  3. 检查交易提案中指定的矿工地址是否正确。如果不正确,则发送拒绝事件。
    proposal := deal.Proposal

    if proposal.Provider != environment.Address() {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("incorrect provider for deal"))}

  4. 检查交易指定的高度是否正确。如果不正确,则发送拒绝事件。
    if height > proposal.StartEpoch-environment.DealAcceptanceBuffer() {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("deal start epoch is too soon or deal already expired"))}
  5. 检查费用是否OK,如果不OK,则发送拒绝事件。
    minPrice := big.Div(big.Mul(environment.Ask().Price, abi.NewTokenAmount(int64(proposal.PieceSize))), abi.NewTokenAmount(1<<30))if proposal.StoragePricePerEpoch.LessThan(minPrice) {    return ctx.Trigger(storagemarket.ProviderEventDealRejected,        xerrors.Errorf("storage price per epoch less than asking price: %s < %s", proposal.StoragePricePerEpoch, minPrice))}
  6. 检查交易的大小是否匹配。如果不匹配,则发送拒绝事件。
    if proposal.PieceSize < environment.Ask().MinPieceSize {    return ctx.Trigger(storagemarket.ProviderEventDealRejected,        xerrors.Errorf("piece size less than minimum required size: %d < %d", proposal.PieceSize, environment.Ask().MinPieceSize))}

    if proposal.PieceSize > environment.Ask().MaxPieceSize {    return ctx.Trigger(storagemarket.ProviderEventDealRejected,        xerrors.Errorf("piece size more than maximum allowed size: %d > %d", proposal.PieceSize, environment.Ask().MaxPieceSize))}

  7. 获取客户的资金。
    clientMarketBalance, err := environment.Node().GetBalance(ctx.Context(), proposal.Client, tok)if err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting client market balance failed: %w", err))}
  8. 如果客户可用资金小于总的交易费用,则发送拒绝事件。
    if clientMarketBalance.Available.LessThan(proposal.TotalStorageFee()) {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.New("clientMarketBalance.Available too small"))}
  9. 如果交易是验证过的,则进行验证。
  10. fsm 上下文对象的 Trigger 方法,发送事件。
    return ctx.Trigger(storagemarket.ProviderEventDealDeciding)

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealUnknown 修改为 StorageDealAcceptWait,从而调用其处理函数 DecideOnProposal 确定是否接收交易。

2、`DecideOnProposal` 函数

这个函数用来决定接受或拒绝交易。

  1. 调用环境对象的 RunCustomDecisionLogic 方法,运行自定义逻辑来验证是不接收客户交易。
    accept, reason, err := environment.RunCustomDecisionLogic(ctx.Context(), deal)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("custom deal decision logic failed: %w", err))}

  2. 如果不接收,则发送拒绝事件。
    if !accept {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, fmt.Errorf(reason))}
  3. 调用环境对象的 SendSignedResponse 方法,发送签名的响应给客户端。
    err = environment.SendSignedResponse(ctx.Context(), &network.Response{    State:    storagemarket.StorageDealWaitingForData,    Proposal: deal.ProposalCid,})

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err)}

    这个方法找到对应的流,然后对响应进行签名,生成签名的响应对象,最后通过流发送响应。

  4. 断开与客户端的连接。
    if err := environment.Disconnect(deal.ProposalCid); err != nil {    log.Warnf("closing client connection: %+v", err)}
  5. 调用 fsm 上下文对象的 Trigger 方法,发送一个事件。
    return ctx.Trigger(storagemarket.ProviderEventDataRequested)

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealAcceptWait 修改为 StorageDealWaitingForData,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。

    当数据开始传输时,数据传输组件发送 ProviderEventDataTransferInitiated 事件,经过事件处理器把状态从 StorageDealWaitingForData 修改为 StorageDealTransferring,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。

    当数据传输完成时,数据传输组件发送 ProviderEventDataTransferCompleted 事件,经过事件处理器把状态从 StorageDealTransferring 修改为 StorageDealVerifyData,从而调用其处理函数 VerifyData 验证数据。

3、`VerifyData` 函数

这个函数验证接受到的数据与交易提案中的 pieceCID 相匹配。

VerifyData 函数流程如下:

  1. 调用环境对象的 GeneratePieceCommitmentToFile 方法,生成碎片的 CID 、碎片所在目录和元数据目录。
    pieceCid, piecePath, metadataPath, err := environment.GeneratePieceCommitmentToFile(deal.Ref.Root, shared.AllSelector())
    

    GeneratePieceCommitmentToFile 方法内容如下:

    • 如果矿工设置了 universalRetrievalEnabled 标志,则直接调用 GeneratePieceCommitmentWithMetadata 函数进行处理。
      if p.p.universalRetrievalEnabled {
          return providerutils.GeneratePieceCommitmentWithMetadata(p.p.fs, p.p.pio.GeneratePieceCommitmentToFile, p.p.proofType, payloadCid, selector)
      }
      

      universalRetrievalEnabled 标志如果为真,则存储矿工会跟踪碎片中的所有 CID,因此对于所有 CID 都可以被检索,而不仅是 Root CID。

    • 否则,调用 piece IO 对象的 GeneratePieceCommitmentToFile 方法进行处理。
      pieceCid, piecePath, _, err := p.p.pio.GeneratePieceCommitmentToFile(p.p.proofType, payloadCid, selector)

      payloadCid 表示根 Root CID。

      piece IO 对象的 GeneratePieceCommitmentToFile 方法处理如下:

      • 调用文件存储对象的 CreateTemp 方法,创建一个临时文件。
        f, err := pio.store.CreateTemp()
        
      • 生成一个清理函数。
        cleanup := func() {
            f.Close()
            _ = pio.store.Delete(f.Path())
        }
        
      • 从底层存储对象中获取指定 CID 的内容,然后写入指定文件。
        err = pio.carIO.WriteCar(context.Background(), pio.bs, payloadCid, selector, f, userOnNewCarBlocks...)
        
      • 获取文件大小,即碎片大小。
        pieceSize := uint64(f.Size())
        
      • 定位到文件开头位置。
        _, err = f.Seek(0, io.SeekStart)
        
      • 使用文件内容生成碎片 ID。
        commitment, paddedSize, err := GeneratePieceCommitment(rt, f, pieceSize)
        
      • 关闭文件。
        _ = f.Close()
        
      • 返回碎片 CID 和文件路径。
        return commitment, f.Path(), paddedSize, nil
        
    • 返回碎片 CID 和碎片路径。
      return pieceCid, piecePath, filestore.Path(""), err
      
  2. 验证生成的碎片 CID 和矿工交易中交易提案的碎片 CID是否一致。如果不一致,则发送拒绝事件。
    if pieceCid != deal.Proposal.PieceCID {
      return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("proposal CommP doesn't match calculated CommP"))
    }
    

3. 调用 fsm 上下文对象的 Trigger 方法,发送一个事件。

    return ctx.Trigger(storagemarket.ProviderEventVerifiedData, piecePath, metadataPath)

当状态机收到这个事件后,经过事件处理器把状态从 `StorageDealVerifyData` 修改为 `StorageDealEnsureProviderFunds`,从而调用其处理函数 `EnsureProviderFunds` 确定是否接收交易。同时,在调用处理函数之前,通过 `Action` 函数,修改矿工交易对象的 `PiecePath` 和 `MetadataPath` 两个属性。

4、`EnsureProviderFunds` 函数

这个函数用来确定矿工有足够的资金来处理当前交易。

  1. 获取 Lotus Provider 适配器。
    node := environment.Node()
    
  2. 获取区块链顶部 tipset 对应的 key 和高度。
    tok, _, err := node.GetChainHead(ctx.Context())

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("acquiring chain head: %w", err))}

  3. 获取矿工的 worker 地址。
    waddr, err := node.GetMinerWorkerAddress(ctx.Context(), deal.Proposal.Provider, tok)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("looking up miner worker: %w", err))}

  4. 调用 Lotus Provider 适配器的 EnsureFunds 方法,确保矿工有足够的资金来处理当前交易。
    mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, deal.Proposal.ProviderCollateral, tok)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("ensuring funds: %w", err))}

  5. 如果返回的 mcid 是空的,那么意味着已经实时确认,则调用 fsm 上下文对象的 Trigger 方法,发送一个事件。
    if mcid == cid.Undef {    return ctx.Trigger(storagemarket.ProviderEventFunded)}
  6. 否则,调用 fsm 上下文对象的 Trigger 方法,发送另一个事件。
    return ctx.Trigger(storagemarket.ProviderEventFundingInitiated, mcid)

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealEnsureProviderFunds 修改为 StorageDealProviderFunding,从而调用其处理函数 WaitForFunding 等待产一步的消息上链。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 PublishCid 属性。

5、`WaitForFunding` 函数

这个函数用来等待消息上链。消息上链之后,调用 fsm 上下文对象的 Trigger 方法,发送一个事件。

函数内容如下:

node := environment.Node()

return node.WaitForMessage(ctx.Context(), *deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error {    if err != nil {        return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds errored: %w", err))    }    if code != exitcode.Ok {        return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %s", code.String()))    }    return ctx.Trigger(storagemarket.ProviderEventFunded)})

当状态机收到 ProviderEventFunded 这个事件后,经过事件处理器把状态从 StorageDealProviderFunding 修改为 StorageDealPublish,从而调用其处理函数 PublishDeal 把交易信息上链。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 PublishCid 属性。

6、`PublishDeal` 函数

这个函数主要用来提交交易信息上链。

  1. 生成矿工交易对象。
    smDeal := storagemarket.MinerDeal{
        Client:             deal.Client,
        ClientDealProposal: deal.ClientDealProposal,
        ProposalCid:        deal.ProposalCid,
        State:              deal.State,
        Ref:                deal.Ref,
    }
    
  2. 调用 Lotus Provider 适配器对象的 PublishDeals 把交易信息上链。
    mcid, err := environment.Node().PublishDeals(ctx.Context(), smDeal)
    if err != nil {
        return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("publishing deal: %w", err))
    }
    
  3. 调用 fsm 上下文对象的 Trigger 方法,发送事件。
    return ctx.Trigger(storagemarket.ProviderEventDealPublishInitiated, mcid)
    

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealPublish 修改为 StorageDealPublishing,从而调用其处理函数 WaitForPublish 等待交易信息上链。

7、`WaitForPublish` 函数

这个函数用来等待交易信息上链,然后给客户端发送响应,然后断开与客户端的连接。最后调用 fsm 上下文对象的 Trigger 方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealPublished

当状态机收到这个事件后,经过事件处理器把状态从 StorageDealPublishing 修改为 StorageDealStaged,从而调用其处理函数 HandoffDeal 开始扇区密封处理。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 ConnectionClosedDealID 属性。

return environment.Node().WaitForMessage(ctx.Context(), *deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error {
    if err != nil {
        return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals errored: %w", err))
    }
    if code != exitcode.Ok {
        return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %s", code.String()))
    }
    var retval market.PublishStorageDealsReturn
    err = retval.UnmarshalCBOR(bytes.NewReader(retBytes))
    if err != nil {
        return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err))
    }

    return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0])})

8、`HandoffDeal` 函数

这个函数调用 miner 的 Provide 适配器的

  1. 使用碎片路径生成文件对象。
    file, err := environment.FileStore().Open(deal.PiecePath)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err))}

  2. 使用碎片文件流生成碎片流。
    paddedReader, paddedSize := padreader.New(file, uint64(file.Size()))
  3. 调用 Lotus Provider 适配器对象的 OnDealComplete 方法,通知交易已经完成,从而把碎片加入某个扇区中。
    err = environment.Node().OnDealComplete(    ctx.Context(),    storagemarket.MinerDeal{        Client:             deal.Client,        ClientDealProposal: deal.ClientDealProposal,        ProposalCid:        deal.ProposalCid,        State:              deal.State,        Ref:                deal.Ref,        DealID:             deal.DealID,        FastRetrieval:      deal.FastRetrieval,  PiecePath:          filestore.Path(environment.FileStore().Filename(deal.PiecePath)),},    paddedSize,    paddedReader,)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)}

  4. 调用 fsm 上下文对象的 Trigger 方法,发送事件。
    return ctx.Trigger(storagemarket.ProviderEventDealHandedOff)

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealStaged 修改为 StorageDealSealing,从而调用其处理函数 VerifyDealActivated 等待扇区密封结果。

9、`VerifyDealActivated` 函数

  1. 生成回调函数。
    cb := func(err error) {    if err != nil {        _ = ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err)    } else {        _ = ctx.Trigger(storagemarket.ProviderEventDealActivated)    }}

    当 Lotus Provider 适配器对象检查到交易对象变化时会调用这个回调函数,从而发送相应的事件。

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealSealing 修改为 StorageDealActive,从而调用其处理函数 RecordPieceInfo 记录相关信息。

  2. 调用 Lotus Provider 适配器对象的 OnDealSectorCommitted 方法,等待扇区被提交。
    err := environment.Node().OnDealSectorCommitted(ctx.Context(), deal.Proposal.Provider, deal.DealID, cb)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err)}

  3. 返回空。
    return nil

9、`RecordPieceInfo` 函数

这个函数主要记录相关信息。

最后调用 fsm 上下文对象的 Trigger 方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealCompleted

当状态机收到这个事件后,经过事件处理器把状态从 StorageDealActive 修改为 StorageDealCompleted,最终结束状态机处理。

这里会删除碎片的临时文件。

本文链接:https://www.8btc.com/article/632253
转载请注明文章出处

转载声明:本文 由CoinON抓取收录,观点仅代表作者本人,不代表CoinON资讯立场,CoinON不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。若以此作为投资依据,请自行承担全部责任。

声明:图文来源于网络,如有侵权请联系删除

风险提示:投资有风险,入市需谨慎。本资讯不作为投资理财建议。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 2020年8月7日 上午7:59
下一篇 2020年8月7日 上午8:58

相关推荐

精通 Filecoin:Lotus真实数据处理之Provider处理存储

星期五 2020-08-07 8:07:13

上篇,当 Client 接收到用户的存储交易,创建一个 /fil/storage/mk/1.0.1 协议的流,然后通过流发送存储交易。处理这个协议的正是 HandleDealStream 方法。这个方法直接调用自身的 receiveDeal 方法进行处理。receiveDeal 方法处理如下:

  1. 从流中读取存储提案 Proposal 对象。
    proposal, err := s.ReadDealProposal()
    

    这里的流对象是 dealStream 对象(storagemarket/network/deal_stream.go),这个对象对原始流对象进行了封装。

  2. 获取 ipld node 对象。
    proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
  3. 生成矿工交易对象。
    deal := &storagemarket.MinerDeal{    Client:             s.RemotePeer(),    Miner:              p.net.ID(),    ClientDealProposal: *proposal.DealProposal,    ProposalCid:        proposalNd.Cid(),    State:              storagemarket.StorageDealUnknown,    Ref:                proposal.Piece,}
  4. 调用 fsm 状态组的 Begin 的方法,生成一个状态机,并开始跟踪矿工交易对象。
    err = p.deals.Begin(proposalNd.Cid(), deal)
  5. 保存流对象到连接管理器中。
    err = p.conns.AddStream(proposalNd.Cid(), s)
  6. 发送事件到 fsm 状态组,从而开始对交易对象进行处理。
    return p.deals.Send(proposalNd.Cid(), storagemarket.ProviderEventOpen)

    当处理机收到 ProviderEventOpen 状态事件时,因为初始状态为默认值 0,即 StorageDealUnknown,事件处理器对象经过内部处理找到对应的目的状态为 StorageDealValidating,从而调用其处理函数 ValidateDealProposal 函数进行处理。



1、`ValidateDealProposal` 函数

这个函数用来验证交易提案对象。

  1. 调用 Lotus Provider 适配器对象的 GetChainHead 方法,获取区块链顶部 tipset key 和其高度。
    tok, height, err := environment.Node().GetChainHead(ctx.Context())

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting most recent state id: %w", err))}

  2. 验证客户发送的交易提案对象。如果验证不通过,则发送拒绝事件。
    if err := providerutils.VerifyProposal(ctx.Context(), deal.ClientDealProposal, tok, environment.Node().VerifySignature); err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("verifying StorageDealProposal: %w", err))}
  3. 检查交易提案中指定的矿工地址是否正确。如果不正确,则发送拒绝事件。
    proposal := deal.Proposal

    if proposal.Provider != environment.Address() {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("incorrect provider for deal"))}

  4. 检查交易指定的高度是否正确。如果不正确,则发送拒绝事件。
    if height > proposal.StartEpoch-environment.DealAcceptanceBuffer() {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("deal start epoch is too soon or deal already expired"))}
  5. 检查费用是否OK,如果不OK,则发送拒绝事件。
    minPrice := big.Div(big.Mul(environment.Ask().Price, abi.NewTokenAmount(int64(proposal.PieceSize))), abi.NewTokenAmount(1<<30))if proposal.StoragePricePerEpoch.LessThan(minPrice) {    return ctx.Trigger(storagemarket.ProviderEventDealRejected,        xerrors.Errorf("storage price per epoch less than asking price: %s < %s", proposal.StoragePricePerEpoch, minPrice))}
  6. 检查交易的大小是否匹配。如果不匹配,则发送拒绝事件。
    if proposal.PieceSize < environment.Ask().MinPieceSize {    return ctx.Trigger(storagemarket.ProviderEventDealRejected,        xerrors.Errorf("piece size less than minimum required size: %d < %d", proposal.PieceSize, environment.Ask().MinPieceSize))}

    if proposal.PieceSize > environment.Ask().MaxPieceSize {    return ctx.Trigger(storagemarket.ProviderEventDealRejected,        xerrors.Errorf("piece size more than maximum allowed size: %d > %d", proposal.PieceSize, environment.Ask().MaxPieceSize))}

  7. 获取客户的资金。
    clientMarketBalance, err := environment.Node().GetBalance(ctx.Context(), proposal.Client, tok)if err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting client market balance failed: %w", err))}
  8. 如果客户可用资金小于总的交易费用,则发送拒绝事件。
    if clientMarketBalance.Available.LessThan(proposal.TotalStorageFee()) {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.New("clientMarketBalance.Available too small"))}
  9. 如果交易是验证过的,则进行验证。
  10. fsm 上下文对象的 Trigger 方法,发送事件。
    return ctx.Trigger(storagemarket.ProviderEventDealDeciding)

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealUnknown 修改为 StorageDealAcceptWait,从而调用其处理函数 DecideOnProposal 确定是否接收交易。

2、`DecideOnProposal` 函数

这个函数用来决定接受或拒绝交易。

  1. 调用环境对象的 RunCustomDecisionLogic 方法,运行自定义逻辑来验证是不接收客户交易。
    accept, reason, err := environment.RunCustomDecisionLogic(ctx.Context(), deal)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("custom deal decision logic failed: %w", err))}

  2. 如果不接收,则发送拒绝事件。
    if !accept {    return ctx.Trigger(storagemarket.ProviderEventDealRejected, fmt.Errorf(reason))}
  3. 调用环境对象的 SendSignedResponse 方法,发送签名的响应给客户端。
    err = environment.SendSignedResponse(ctx.Context(), &network.Response{    State:    storagemarket.StorageDealWaitingForData,    Proposal: deal.ProposalCid,})

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err)}

    这个方法找到对应的流,然后对响应进行签名,生成签名的响应对象,最后通过流发送响应。

  4. 断开与客户端的连接。
    if err := environment.Disconnect(deal.ProposalCid); err != nil {    log.Warnf("closing client connection: %+v", err)}
  5. 调用 fsm 上下文对象的 Trigger 方法,发送一个事件。
    return ctx.Trigger(storagemarket.ProviderEventDataRequested)

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealAcceptWait 修改为 StorageDealWaitingForData,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。

    当数据开始传输时,数据传输组件发送 ProviderEventDataTransferInitiated 事件,经过事件处理器把状态从 StorageDealWaitingForData 修改为 StorageDealTransferring,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。

    当数据传输完成时,数据传输组件发送 ProviderEventDataTransferCompleted 事件,经过事件处理器把状态从 StorageDealTransferring 修改为 StorageDealVerifyData,从而调用其处理函数 VerifyData 验证数据。

3、`VerifyData` 函数

这个函数验证接受到的数据与交易提案中的 pieceCID 相匹配。

VerifyData 函数流程如下:

  1. 调用环境对象的 GeneratePieceCommitmentToFile 方法,生成碎片的 CID 、碎片所在目录和元数据目录。
    pieceCid, piecePath, metadataPath, err := environment.GeneratePieceCommitmentToFile(deal.Ref.Root, shared.AllSelector())
    

    GeneratePieceCommitmentToFile 方法内容如下:

    • 如果矿工设置了 universalRetrievalEnabled 标志,则直接调用 GeneratePieceCommitmentWithMetadata 函数进行处理。
      if p.p.universalRetrievalEnabled {
          return providerutils.GeneratePieceCommitmentWithMetadata(p.p.fs, p.p.pio.GeneratePieceCommitmentToFile, p.p.proofType, payloadCid, selector)
      }
      

      universalRetrievalEnabled 标志如果为真,则存储矿工会跟踪碎片中的所有 CID,因此对于所有 CID 都可以被检索,而不仅是 Root CID。

    • 否则,调用 piece IO 对象的 GeneratePieceCommitmentToFile 方法进行处理。
      pieceCid, piecePath, _, err := p.p.pio.GeneratePieceCommitmentToFile(p.p.proofType, payloadCid, selector)

      payloadCid 表示根 Root CID。

      piece IO 对象的 GeneratePieceCommitmentToFile 方法处理如下:

      • 调用文件存储对象的 CreateTemp 方法,创建一个临时文件。
        f, err := pio.store.CreateTemp()
        
      • 生成一个清理函数。
        cleanup := func() {
            f.Close()
            _ = pio.store.Delete(f.Path())
        }
        
      • 从底层存储对象中获取指定 CID 的内容,然后写入指定文件。
        err = pio.carIO.WriteCar(context.Background(), pio.bs, payloadCid, selector, f, userOnNewCarBlocks...)
        
      • 获取文件大小,即碎片大小。
        pieceSize := uint64(f.Size())
        
      • 定位到文件开头位置。
        _, err = f.Seek(0, io.SeekStart)
        
      • 使用文件内容生成碎片 ID。
        commitment, paddedSize, err := GeneratePieceCommitment(rt, f, pieceSize)
        
      • 关闭文件。
        _ = f.Close()
        
      • 返回碎片 CID 和文件路径。
        return commitment, f.Path(), paddedSize, nil
        
    • 返回碎片 CID 和碎片路径。
      return pieceCid, piecePath, filestore.Path(""), err
      
  2. 验证生成的碎片 CID 和矿工交易中交易提案的碎片 CID是否一致。如果不一致,则发送拒绝事件。
    if pieceCid != deal.Proposal.PieceCID {
      return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("proposal CommP doesn't match calculated CommP"))
    }
    

3. 调用 fsm 上下文对象的 Trigger 方法,发送一个事件。

    return ctx.Trigger(storagemarket.ProviderEventVerifiedData, piecePath, metadataPath)

当状态机收到这个事件后,经过事件处理器把状态从 `StorageDealVerifyData` 修改为 `StorageDealEnsureProviderFunds`,从而调用其处理函数 `EnsureProviderFunds` 确定是否接收交易。同时,在调用处理函数之前,通过 `Action` 函数,修改矿工交易对象的 `PiecePath` 和 `MetadataPath` 两个属性。

4、`EnsureProviderFunds` 函数

这个函数用来确定矿工有足够的资金来处理当前交易。

  1. 获取 Lotus Provider 适配器。
    node := environment.Node()
    
  2. 获取区块链顶部 tipset 对应的 key 和高度。
    tok, _, err := node.GetChainHead(ctx.Context())

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("acquiring chain head: %w", err))}

  3. 获取矿工的 worker 地址。
    waddr, err := node.GetMinerWorkerAddress(ctx.Context(), deal.Proposal.Provider, tok)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("looking up miner worker: %w", err))}

  4. 调用 Lotus Provider 适配器的 EnsureFunds 方法,确保矿工有足够的资金来处理当前交易。
    mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, deal.Proposal.ProviderCollateral, tok)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("ensuring funds: %w", err))}

  5. 如果返回的 mcid 是空的,那么意味着已经实时确认,则调用 fsm 上下文对象的 Trigger 方法,发送一个事件。
    if mcid == cid.Undef {    return ctx.Trigger(storagemarket.ProviderEventFunded)}
  6. 否则,调用 fsm 上下文对象的 Trigger 方法,发送另一个事件。
    return ctx.Trigger(storagemarket.ProviderEventFundingInitiated, mcid)

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealEnsureProviderFunds 修改为 StorageDealProviderFunding,从而调用其处理函数 WaitForFunding 等待产一步的消息上链。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 PublishCid 属性。

5、`WaitForFunding` 函数

这个函数用来等待消息上链。消息上链之后,调用 fsm 上下文对象的 Trigger 方法,发送一个事件。

函数内容如下:

node := environment.Node()

return node.WaitForMessage(ctx.Context(), *deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error {    if err != nil {        return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds errored: %w", err))    }    if code != exitcode.Ok {        return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %s", code.String()))    }    return ctx.Trigger(storagemarket.ProviderEventFunded)})

当状态机收到 ProviderEventFunded 这个事件后,经过事件处理器把状态从 StorageDealProviderFunding 修改为 StorageDealPublish,从而调用其处理函数 PublishDeal 把交易信息上链。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 PublishCid 属性。

6、`PublishDeal` 函数

这个函数主要用来提交交易信息上链。

  1. 生成矿工交易对象。
    smDeal := storagemarket.MinerDeal{
        Client:             deal.Client,
        ClientDealProposal: deal.ClientDealProposal,
        ProposalCid:        deal.ProposalCid,
        State:              deal.State,
        Ref:                deal.Ref,
    }
    
  2. 调用 Lotus Provider 适配器对象的 PublishDeals 把交易信息上链。
    mcid, err := environment.Node().PublishDeals(ctx.Context(), smDeal)
    if err != nil {
        return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("publishing deal: %w", err))
    }
    
  3. 调用 fsm 上下文对象的 Trigger 方法,发送事件。
    return ctx.Trigger(storagemarket.ProviderEventDealPublishInitiated, mcid)
    

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealPublish 修改为 StorageDealPublishing,从而调用其处理函数 WaitForPublish 等待交易信息上链。

7、`WaitForPublish` 函数

这个函数用来等待交易信息上链,然后给客户端发送响应,然后断开与客户端的连接。最后调用 fsm 上下文对象的 Trigger 方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealPublished

当状态机收到这个事件后,经过事件处理器把状态从 StorageDealPublishing 修改为 StorageDealStaged,从而调用其处理函数 HandoffDeal 开始扇区密封处理。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 ConnectionClosedDealID 属性。

return environment.Node().WaitForMessage(ctx.Context(), *deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error {
    if err != nil {
        return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals errored: %w", err))
    }
    if code != exitcode.Ok {
        return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %s", code.String()))
    }
    var retval market.PublishStorageDealsReturn
    err = retval.UnmarshalCBOR(bytes.NewReader(retBytes))
    if err != nil {
        return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err))
    }

    return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0])})

8、`HandoffDeal` 函数

这个函数调用 miner 的 Provide 适配器的

  1. 使用碎片路径生成文件对象。
    file, err := environment.FileStore().Open(deal.PiecePath)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err))}

  2. 使用碎片文件流生成碎片流。
    paddedReader, paddedSize := padreader.New(file, uint64(file.Size()))
  3. 调用 Lotus Provider 适配器对象的 OnDealComplete 方法,通知交易已经完成,从而把碎片加入某个扇区中。
    err = environment.Node().OnDealComplete(    ctx.Context(),    storagemarket.MinerDeal{        Client:             deal.Client,        ClientDealProposal: deal.ClientDealProposal,        ProposalCid:        deal.ProposalCid,        State:              deal.State,        Ref:                deal.Ref,        DealID:             deal.DealID,        FastRetrieval:      deal.FastRetrieval,  PiecePath:          filestore.Path(environment.FileStore().Filename(deal.PiecePath)),},    paddedSize,    paddedReader,)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)}

  4. 调用 fsm 上下文对象的 Trigger 方法,发送事件。
    return ctx.Trigger(storagemarket.ProviderEventDealHandedOff)

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealStaged 修改为 StorageDealSealing,从而调用其处理函数 VerifyDealActivated 等待扇区密封结果。

9、`VerifyDealActivated` 函数

  1. 生成回调函数。
    cb := func(err error) {    if err != nil {        _ = ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err)    } else {        _ = ctx.Trigger(storagemarket.ProviderEventDealActivated)    }}

    当 Lotus Provider 适配器对象检查到交易对象变化时会调用这个回调函数,从而发送相应的事件。

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealSealing 修改为 StorageDealActive,从而调用其处理函数 RecordPieceInfo 记录相关信息。

  2. 调用 Lotus Provider 适配器对象的 OnDealSectorCommitted 方法,等待扇区被提交。
    err := environment.Node().OnDealSectorCommitted(ctx.Context(), deal.Proposal.Provider, deal.DealID, cb)

    if err != nil {    return ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err)}

  3. 返回空。
    return nil

9、`RecordPieceInfo` 函数

这个函数主要记录相关信息。

最后调用 fsm 上下文对象的 Trigger 方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealCompleted

当状态机收到这个事件后,经过事件处理器把状态从 StorageDealActive 修改为 StorageDealCompleted,最终结束状态机处理。

这里会删除碎片的临时文件。

本文链接:https://www.8btc.com/article/632253
转载请注明文章出处