Architecture

Architecture flow of processing steps

Overview

Processing Steps

1. Request with intended task serialization

First, define the Datalake and Task details intended to get results from Data Processor. The goal is to generate bytes representation for intended Data Processor request parameters.

Request on-chain

To define in Solidity:

You can also check more dynamic inputs using hdp-solidity dynamic test.

// Data Processor request
BlockSampledDatalake datalake =
    BlockSampledDatalake({
        blockRangeStart: 5260543,
        blockRangeEnd: 5260571,
        increment: 3,
        sampledProperty: BlockSampledDatalakeCodecs
            .encodeSampledPropertyForAccount(
                address(0x7f2C6f930306D3AA736B3A6C6A98f512F74036D4),
                uint8(1)
            )
    });

ComputationalTask computationalTask =
    ComputationalTask({
        aggregateFnId: AggregateFn.SUM,
        operatorId: Operator.NONE,
        valueToCompare: uint256(0)
    });

Request off-chain

To define using hdp-cli encoding :

hdp encode -a "sum" -b 5260543 5260571 "account.0x7f2c6f930306d3aa736b3a6c6a98f512f74036d4.balance" 3

To define using API request:

Using the Data Processor server you can pass the POST request with params like the ones indicated above. The server will process the input and serialize it into the same byte code representation as the one above with the hdp-cli and smart contract. Please refer to the API documentation.

2. Schedule Task on-chain (on-chain request)

Initially, you need to pass the bytes representation of Data Processor task in Data Processor contract. Call request function as this checks if the task is first-time processing and schedules the task. As the on-chain request is not proactive to move on to the next step, we emit an event that this contract function had called. We use event-watcher to catch it and move on to the next step of processing.

// Data Processor Server call [`requestExecutionOfTaskWithBlockSampledDatalake`] before processing
hdp.requestExecutionOfTaskWithBlockSampledDatalake(
    datalake,
    computationalTask
);

The function call emits the event. This will be caught by the event watcher to proceed next step of data processing.

 /// @notice emitted when a new task is scheduled
    event TaskWithBlockSampledDatalakeScheduled(BlockSampledDatalake datalake, ComputationalTask task);

3. Data Processor Preprocess

For more detailed information on how this step works under the hood, please refer to the repository..

The following command executes the necessary steps:

hdp run ${tasksSerialized} ${datalakesSerialized} ${rpc_url} -o ${generalOutputFile} -p ${pieFilePath} -c ${cairoInputFilePath}

1) Preprocess to Generate input.json File

The Data Processor preprocessor fetches all the proofs related to the requested datalake and precomputes the result if it involves preprocessable tasks. It then generates a Cairo-compatible formatted input file.

2) Run Data Processor Cairo Program Over cairo_vm with the Given Input File to Generate a PIE File

Using the input file generated in the first step, we run the compiled Data Processor Cairo program through cairo-run. This process generates a PIE file that can be later processed through SHARP. For a detailed explanation of the Cairo Program, refer to this section.

3) Generate a Human-Readable Output File Containing All Results

If the batch contains a Cairo 1 custom compute module, we need to parse task results and the batched results root, then reformat the file to include proofs, tasks, and results that will be used on the server.

3-2. Run Cairo Program

The Data Processor Cairo program validates the results returned by the Data Processor preprocess step. The program runs in 3 stages.

  1. Verify Data: Currently, Cairo Data Processor supports three types of on-chain data: Headers, Accounts, and Storage Slots. In the verification stage, Cairo Data Processor performs the required inclusion proofs, ensuring the inputs are valid and can be found on-chain.

  2. Compute: Perform the specified computation over the verified data. E.g. avg if a user's Ether balance

  3. Pack: Once the results are computed, the task and result are added to a Merkle tree. This allows multiple tasks to be computed in one execution.

To run the Cairo Data Processor locally, the following is required:

  • Generate Cairo-compatible inputs via Data Processor preprocess. This can be generated with the -c flag

  • Set up Python VENV & install dependencies, as outlined in the readme.

  • Move inputs to src/hdp/hdp_input.json

  • Run Cairo Data Processor with make run and then hdp.cairo

4. Cache MMR

The Data Processor preprocess result indicates which MMR with whole datalakes is relevant to the task. As Herodotus keeps aggregating new blocks and merging with the MMR, there is a chance that the relevant MMR state (root, size) has been modified. For more information on block aggregation, check out the blog. To ensure MMR proves validity before running computation in Cairo, we cache the MMR state fetched from the Data Processor preprocessing step by simply calling this load function in the smart contract.

 /// @notice Load MMR root from cache with given mmrId and mmrSize
  function loadMmrRoot(
      uint256 mmrId,
      uint256 mmrSize
  ) public view returns (bytes32) {
      return cachedMMRsRoots[mmrId][mmrSize];
  }

5. Check Fact Finalization (off-chain)

One Cairo Program’s validation is represented as one fact hash. After the Cairo Program finishes running, if it is indeed valid, it sends this fact hash to FactRegistry Contract of SHARP. If the fact has been registered in the Fact Registry the process reaches its final step.

The way how we check fact finalization is to run cron for targeted tasks, and call isValid method for expected fact hash.

6. Authenticate Task Execution

One Cairo Program’s validation is represented as one fact hash. After the Cairo Program finishes running, if it is indeed valid, it sends this fact hash to FactRegistry Contract of SHARP. So to process our final step of Data Processor, check if the fact has been registered in the Fact Registry and then process the final step. The way how we check fact finalization is by running cron for targeted tasks and calling isValid method for expected fact hash.

// Load MMRs root
bytes32 usedMmrRoot = loadMmrRoot(usedMmrId, usedMmrSize);

// Initialize an array of uint256 to store the program output
uint256[] memory programOutput = new uint256[](6);

// Assign values to the program output array
programOutput[0] = uint256(usedMmrRoot);
programOutput[1] = usedMmrSize;
programOutput[2] = resultMerkleRootLow;
programOutput[3] = resultMerkleRootHigh;
programOutput[4] = taskMerkleRootLow;
programOutput[5] = taskMerkleRootHigh;

// Compute program output hash
bytes32 programOutputHash = keccak256(abi.encodePacked(programOutput));

// Compute GPS fact hash
bytes32 gpsFactHash = keccak256(
    abi.encode(PROGRAM_HASH, programOutputHash)
);

// Ensure GPS fact is registered
require(
    SHARP_FACTS_REGISTRY.isValid(gpsFactHash),
    "HdpExecutionStore: GPS fact is not registered"
);

Second, given proofs and root of the Standard Merkle Tree of results and tasks, we can verify the proof within the root and check if it is included in the batch.

// Compute the Merkle leaf of the task
bytes32 taskCommitment = taskCommitments[i];
bytes32 taskMerkleLeaf = standardLeafHash(
    taskCommitment
);
// Ensure that the task is included in the batch, by verifying the Merkle proof
bool isVerifiedTask = taskInclusionProof
.verify(
    scheduledTasksBatchMerkleRoot,
    taskMerkleLeaf
);

 // Compute the Merkle leaf of the task result
bytes32 taskResultCommitment = keccak256(
    abi.encode(taskCommitment, computationalTaskResult)
);
bytes32 taskResultMerkleLeaf = standardLeafHash(
   taskResultCommitment
);
// Ensure that the task result is included in the batch, by verifying the Merkle proof
bool isVerifiedResult = resultInclusionProof
.verify(
    batchResultsMerkleRoot,
    taskResultMerkleLeaf
);

After these two steps of validation, we can confirm the targeted, batched tasks are verified through all the Data Processor steps, and we can store this result on-chain. You can access on-chain data in on-chain mapping, that was computed and verified in off-chain using zk proof.

// Store the task result
cachedTasksResult[taskCommitment] = TaskResult({
    status: TaskStatus.FINALIZED,
    result: computationalTaskResult
});

Using Data Processor synchronously

Turbo is under development, this section will be updated very soon.

Last updated