精通 Filecoin:Lotus真实数据处理之Provider初始化

因为 StorageProvider 对象被存储矿工 API 对象所依赖,所以在启动存储矿工的过程中,DI 容器会调用 StorageProvider 函数(node/modules/storageminer.go)来创建它。StorageProvider 函数流程如下:

  1. 调用 NewFromLibp2pHost 函数,生成 StorageMarketNetwork 对象。
    net := smnet.NewFromLibp2pHost(h)
    
  2. 调用 NewLocalFileStore 函数,生成 FileStore 存储对象。
    store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
    

    NewLocalFileStore 函数(go-fil-markets 类库 filestore/filestore.go)流程如下:

    base := filepath.Clean(string(basedirectory))
    info, err := os.Stat(string(base))

    if !info.IsDir() {    return nil, fmt.Errorf("%s is not a directory", base)}

    return &fileStore{string(base)}, nil

    NewLocalFileStore 函数使用的路径为仓库目录。即碎片的临时目录就是仓库目录。

  3. 调用 CustomDealDecisionLogic 函数,返回一个函数对象。在函数对象中调用我们提供的回调函数,进行自定义交易逻辑判断。
    opt := storageimpl.CustomDealDecisionLogic(func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {

    })

  4. 生成并返回 StorageProvider 对象。
    p, err := storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), ibs, store, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, opt)

    return p, nil

    NewProvider 函数处理如下:

    • 生成 PieceIOWithStore 对象。
      carIO := cario.NewCarIO()
      pio := pieceio.NewPieceIOWithStore(carIO, fs, bs)
      
    • 生成 Provider 对象。
      h := &Provider{
          net:                  net,
          proofType:            rt,
          spn:                  spn,
          fs:                   fs,
          pio:                  pio,
          pieceStore:           pieceStore,
          conns:                connmanager.NewConnManager(),
          storedAsk:            storedAsk,
          actor:                minerAddress,
          dataTransfer:         dataTransfer,
          dealAcceptanceBuffer: DefaultDealAcceptanceBuffer,
          pubSub:               pubsub.New(providerDispatcher),
      }
      
    • 生成 fsm 状态组对象。
      deals, err := NewProviderStateMachine(
          ds,
          &providerDealEnvironment{h},
          h.dispatch,
      )

      h.deals = deals

      fsm 状态组对象使用的配置参数如下:

      return fsm.New(ds, fsm.Parameters{
          Environment:     env,
          StateType:       storagemarket.MinerDeal{},
          StateKeyField:   "State",
          Events:          providerstates.ProviderEvents,
          StateEntryFuncs: providerstates.ProviderStateEntryFuncs,
          FinalityStates:  providerstates.ProviderFinalityStates,
          Notifier:        notifier,
      })
      
      • 环境对象为 providerDealEnvironment
      • 状态对象为 MinerDeal
      • 状态字段为 State
      • 事件集合为 ProviderEvents,参考 storagemarket/impl/providerstates/provider_fsm.go 文件。
      • 状态处理函数集合 为 ProviderStateEntryFuncs,状态机的状态处理器根据对应的状态获取到指定的函数进行处理。
      • 终止状态集合为 ProviderFinalityStates
      • 通知对象为 Provider 对象的 dispatch 方法。
    • 使用配置选项,配置 Provider 对象。
      h.Configure(options...)
      
    • 设置数据传输监听对象。
      dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(deals))
      

      当开始数据传输、传输结束、传输错误时会发送 ProviderEventDataTransferInitiatedProviderEventDataTransferCompletedProviderEventDataTransferFailed 等事件到 fsm 状态组。

    • 返回 Provider 对象。


在存储矿工启动过程自动调用 HandleDeals 函数(node/modules/storageminer.go)。在这个函数中,调用 StorageProvider 对象的 Start 方法,从而启动这个对象。

Start 方法执行过程如下:

  1. 调用 StorageMarketNetwork 网络对象的 SetDelegate 设置代理/委托为自身。
    err := p.net.SetDelegate(p)
    

    网络对象的实现为 libp2pStorageMarketNetwork 结构体(storagemarket/network/libp2p_impl.go)。它的 SetDelegate 方法内容如下:

    impl.receiver = r
    impl.host.SetStreamHandler(storagemarket.DealProtocolID, impl.handleNewDealStream)
    impl.host.SetStreamHandler(storagemarket.AskProtocolID, impl.handleNewAskStream)
    return nil
    

    上面分别设置网络对象的 handleNewDealStream 方法处理 DealProtocolID 协议,表示存储;handleNewAskStream 方法 处理 AskProtocolID 协议,表示 ask。

    handleNewDealStream 方法内容如下:

    // 客户端 peer id
    remotePID := s.Conn().RemotePeer()  

    buffered := bufio.NewReaderSize(s, 16)

    // 对流进行包装ds := &dealStream{remotePID, impl.host, s, buffered}

    // 调用 StorageProvider 对象的 HandleDealStream 方法,处理客户端存储请求impl.receiver.HandleDealStream(ds)

  2. 在协程中调用 StorageProvider 对象的 restartDeals 方法,重新进行交易处理。restartDeals 方法流程如下:
    • 从 fsm 状态组对象中获取所有的交易对象。
      var deals []storagemarket.MinerDealerr := c.deals.List(&deals)
    • 遍历所有的交易对象,进行下面的处理:
      • 如果当前交易对象已经终止,则进行下一个处理。
      • 如果当前交易对象的连接已经关闭,则进行下一个处理。
      • 发送初始交易事件给 fsm 状态组。
        err = c.deals.Send(deal.ProposalCid, storagemarket.ProviderEventRestart)

        交易提案的 Cid 表示了状态机的名称/编号。

  3. 返回空值。


本文链接:https://www.8btc.com/article/630375
转载请注明文章出处

转载声明:本文 由CoinON抓取收录,观点仅代表作者本人,不代表CoinON资讯立场,CoinON不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。若以此作为投资依据,请自行承担全部责任。

声明:图文来源于网络,如有侵权请联系删除

风险提示:投资有风险,入市需谨慎。本资讯不作为投资理财建议。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 2020年8月3日 下午2:53
下一篇 2020年8月3日 下午3:54

相关推荐

精通 Filecoin:Lotus真实数据处理之Provider初始化

星期一 2020-08-03 15:03:03

因为 StorageProvider 对象被存储矿工 API 对象所依赖,所以在启动存储矿工的过程中,DI 容器会调用 StorageProvider 函数(node/modules/storageminer.go)来创建它。StorageProvider 函数流程如下:

  1. 调用 NewFromLibp2pHost 函数,生成 StorageMarketNetwork 对象。
    net := smnet.NewFromLibp2pHost(h)
    
  2. 调用 NewLocalFileStore 函数,生成 FileStore 存储对象。
    store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
    

    NewLocalFileStore 函数(go-fil-markets 类库 filestore/filestore.go)流程如下:

    base := filepath.Clean(string(basedirectory))
    info, err := os.Stat(string(base))

    if !info.IsDir() {    return nil, fmt.Errorf("%s is not a directory", base)}

    return &fileStore{string(base)}, nil

    NewLocalFileStore 函数使用的路径为仓库目录。即碎片的临时目录就是仓库目录。

  3. 调用 CustomDealDecisionLogic 函数,返回一个函数对象。在函数对象中调用我们提供的回调函数,进行自定义交易逻辑判断。
    opt := storageimpl.CustomDealDecisionLogic(func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {

    })

  4. 生成并返回 StorageProvider 对象。
    p, err := storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), ibs, store, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, opt)

    return p, nil

    NewProvider 函数处理如下:

    • 生成 PieceIOWithStore 对象。
      carIO := cario.NewCarIO()
      pio := pieceio.NewPieceIOWithStore(carIO, fs, bs)
      
    • 生成 Provider 对象。
      h := &Provider{
          net:                  net,
          proofType:            rt,
          spn:                  spn,
          fs:                   fs,
          pio:                  pio,
          pieceStore:           pieceStore,
          conns:                connmanager.NewConnManager(),
          storedAsk:            storedAsk,
          actor:                minerAddress,
          dataTransfer:         dataTransfer,
          dealAcceptanceBuffer: DefaultDealAcceptanceBuffer,
          pubSub:               pubsub.New(providerDispatcher),
      }
      
    • 生成 fsm 状态组对象。
      deals, err := NewProviderStateMachine(
          ds,
          &providerDealEnvironment{h},
          h.dispatch,
      )

      h.deals = deals

      fsm 状态组对象使用的配置参数如下:

      return fsm.New(ds, fsm.Parameters{
          Environment:     env,
          StateType:       storagemarket.MinerDeal{},
          StateKeyField:   "State",
          Events:          providerstates.ProviderEvents,
          StateEntryFuncs: providerstates.ProviderStateEntryFuncs,
          FinalityStates:  providerstates.ProviderFinalityStates,
          Notifier:        notifier,
      })
      
      • 环境对象为 providerDealEnvironment
      • 状态对象为 MinerDeal
      • 状态字段为 State
      • 事件集合为 ProviderEvents,参考 storagemarket/impl/providerstates/provider_fsm.go 文件。
      • 状态处理函数集合 为 ProviderStateEntryFuncs,状态机的状态处理器根据对应的状态获取到指定的函数进行处理。
      • 终止状态集合为 ProviderFinalityStates
      • 通知对象为 Provider 对象的 dispatch 方法。
    • 使用配置选项,配置 Provider 对象。
      h.Configure(options...)
      
    • 设置数据传输监听对象。
      dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(deals))
      

      当开始数据传输、传输结束、传输错误时会发送 ProviderEventDataTransferInitiatedProviderEventDataTransferCompletedProviderEventDataTransferFailed 等事件到 fsm 状态组。

    • 返回 Provider 对象。


在存储矿工启动过程自动调用 HandleDeals 函数(node/modules/storageminer.go)。在这个函数中,调用 StorageProvider 对象的 Start 方法,从而启动这个对象。

Start 方法执行过程如下:

  1. 调用 StorageMarketNetwork 网络对象的 SetDelegate 设置代理/委托为自身。
    err := p.net.SetDelegate(p)
    

    网络对象的实现为 libp2pStorageMarketNetwork 结构体(storagemarket/network/libp2p_impl.go)。它的 SetDelegate 方法内容如下:

    impl.receiver = r
    impl.host.SetStreamHandler(storagemarket.DealProtocolID, impl.handleNewDealStream)
    impl.host.SetStreamHandler(storagemarket.AskProtocolID, impl.handleNewAskStream)
    return nil
    

    上面分别设置网络对象的 handleNewDealStream 方法处理 DealProtocolID 协议,表示存储;handleNewAskStream 方法 处理 AskProtocolID 协议,表示 ask。

    handleNewDealStream 方法内容如下:

    // 客户端 peer id
    remotePID := s.Conn().RemotePeer()  

    buffered := bufio.NewReaderSize(s, 16)

    // 对流进行包装ds := &dealStream{remotePID, impl.host, s, buffered}

    // 调用 StorageProvider 对象的 HandleDealStream 方法,处理客户端存储请求impl.receiver.HandleDealStream(ds)

  2. 在协程中调用 StorageProvider 对象的 restartDeals 方法,重新进行交易处理。restartDeals 方法流程如下:
    • 从 fsm 状态组对象中获取所有的交易对象。
      var deals []storagemarket.MinerDealerr := c.deals.List(&deals)
    • 遍历所有的交易对象,进行下面的处理:
      • 如果当前交易对象已经终止,则进行下一个处理。
      • 如果当前交易对象的连接已经关闭,则进行下一个处理。
      • 发送初始交易事件给 fsm 状态组。
        err = c.deals.Send(deal.ProposalCid, storagemarket.ProviderEventRestart)

        交易提案的 Cid 表示了状态机的名称/编号。

  3. 返回空值。


本文链接:https://www.8btc.com/article/630375
转载请注明文章出处