Architecture

Architecture Flow of Processing Steps

This section is not intended to explain how to integrate with the Data Processor. It's about explaining the internal pipeline. Please refer to the API documentation or Getting Started for more details.

Architecture

This section provides an overview of the Data Processor architecture. It is intended to share technical details of the Data Processor. If you are interested in API integration, please refer to the API documentation.

Overview

Processing Steps

1. Request with Intended Task Serialization

First, define the Datalake and Task details intended to obtain results from the Data Processor. The goal is to generate a byte representation of the intended Data Processor request parameters.

Request Off-Chain API

Using the Data Processor server, you can send a POST request with parameters like the ones indicated above. The server will process the input and serialize it into the same bytecode representation as the one above using the hdp-cli and smart contract. Please refer to the API documentation for more details.

Request On-Chain

Note: request onchain is not stable yet.

Defining in Solidity:

You can also check more dynamic inputs using the hdp-solidity dynamic test.

// Data Processor request
BlockSampledDatalake datalake =
    BlockSampledDatalake({
        chainId: 11155111,
        blockRangeStart: 5260543,
        blockRangeEnd: 5260571,
        increment: 3,
        sampledProperty: BlockSampledDatalakeCodecs
            .encodeSampledPropertyForAccount(
                address(0x7f2C6f930306D3AA736B3A6C6A98f512F74036D4),
                uint8(1)
            )
    });

ComputationalTask computationalTask =
    ComputationalTask({
        aggregateFnId: AggregateFn.SUM,
        operatorId: Operator.NONE,
        valueToCompare: uint256(0)
    });

2. Schedule Task On-Chain

Initially, you need to pass the byte representation of the Data Processor task into the Data Processor contract. Call the request function, which checks if the task is being processed for the first time and schedules the task. Since the on-chain request does not proactively move to the next step, we emit an event when this contract function is called. We use an event watcher to catch it and proceed to the next step of processing.

// Data Processor Server call [`requestExecutionOfTaskWithBlockSampledDatalake`] before processing
hdp.requestExecutionOfTaskWithBlockSampledDatalake(
    datalake,
    computationalTask
);

This function call emits an event. The event watcher catches it to proceed to the next step of data processing.

/// @notice Emitted when a new task is scheduled
event TaskWithBlockSampledDatalakeScheduled(BlockSampledDatalake datalake, ComputationalTask task);

3. Data Processor Preprocess

For more detailed information on how this step works under the hood, please refer to the repository.

The following command executes the necessary steps:

hdp run --request-file ${REQUEST_FILE_PATH} -c ${cairoInputFilePath} -o ${generalOutputFile} -p ${pieFilePath}

1) Preprocess to Generate input.json File

The Data Processor preprocessor fetches all the proofs related to the requested datalake and precomputes the result if it involves preprocessable tasks. It then generates a Cairo-compatible formatted input file.

2) Run Data Processor Cairo Program Over Cairo VM to Generate a PIE File

Using the input file generated in the first step, we run the compiled Data Processor Cairo program through cairo-run. This process generates a PIE file that can be later processed through SHARP. For a detailed explanation of the Cairo program, refer to the relevant section.

3) Generate a Human-Readable Output File Containing All Results

If the batch contains a Cairo 1 custom compute module, we need to parse task results and the batched results root, then reformat the file to include proofs, tasks, and results that will be used on the server.

3-2. Run Cairo Program

The Data Processor Cairo program validates the results returned by the Data Processor preprocess step. The program runs in three stages:

  1. Verify Data: Currently, the Cairo Data Processor supports three types of on-chain data: Headers, Accounts, and Storage Slots. In the verification stage, the Cairo Data Processor performs the required inclusion proofs, ensuring the inputs are valid and can be found on-chain.

  2. Compute: Performs the specified computation over the verified data (e.g., calculating the average of a user's Ether balance).

  3. Pack: Once the results are computed, the task and result are added to a Merkle tree. This allows multiple tasks to be computed in one execution.

To run the Cairo Data Processor locally, the following is required:

  • Generate Cairo-compatible inputs via the Data Processor preprocess. This can be generated with the -c flag.

  • Set up a Python virtual environment and install dependencies, as outlined in the README.

  • Move inputs to src/hdp/hdp_input.json.

  • Run the Cairo Data Processor with make run and then hdp.cairo.

4. Cache MMR

The Data Processor preprocess result indicates which Merkle Mountain Range (MMR) containing the relevant datalakes is needed for the task. As Herodotus keeps aggregating new blocks and merging them into the MMR, there is a chance that the relevant MMR state (root and size) has been modified. For more information on block aggregation, check out this blog. To ensure the MMR proves validity before running computation in Cairo, we cache the MMR state fetched from the Data Processor preprocessing step by simply calling this load function in the smart contract.

/// @notice Load MMR root from cache with given mmrId and mmrSize
function loadMmrRoot(
    uint256 mmrId,
    uint256 mmrSize
) public view returns (bytes32) {
    return cachedMMRsRoots[mmrId][mmrSize];
}

5. Check Fact Finalization (Off-Chain)

The validation of a Cairo program is represented as a fact hash. After the Cairo program finishes running, if it is indeed valid, it sends this fact hash to the FactRegistry contract of SHARP. If the fact has been registered in the Fact Registry, the process reaches its final step.

To check fact finalization, we run a cron job for targeted tasks and call the isValid method for the expected fact hash.

6. Authenticate Task Execution

After verifying that the fact hash is registered in the Fact Registry, we proceed to authenticate the task execution. This involves verifying the outputs and ensuring they match the expected results.

// Load MMR roots
bytes32 usedMmrRoot = loadMmrRoot(usedMmrId, usedMmrSize);

// Initialize an array of uint256 to store the program output
uint256[] memory programOutput = new uint256[](4 + mmrIds.length * 4);

// Assign values to the program output array
programOutput[0] = resultMerkleRootLow;
programOutput[1] = resultMerkleRootHigh;
programOutput[2] = taskMerkleRootLow;
programOutput[3] = taskMerkleRootHigh;

for (uint8 i = 0; i < mmrIds.length; i++) {
    bytes32 usedMmrRoot = loadMmrRoot(mmrIds[i], mmrSizes[i]);
    programOutput[4 + i * 4] = mmrIds[i];
    programOutput[4 + i * 4 + 1] = mmrSizes[i];
    programOutput[4 + i * 4 + 2] = CHAIN_ID;
    programOutput[4 + i * 4 + 3] = uint256(usedMmrRoot);
}

// Compute program output hash
bytes32 programOutputHash = keccak256(abi.encodePacked(programOutput));

// Compute GPS fact hash
bytes32 gpsFactHash = keccak256(
    abi.encode(PROGRAM_HASH, programOutputHash)
);

// Ensure GPS fact is registered
require(
    SHARP_FACTS_REGISTRY.isValid(gpsFactHash),
    "HdpExecutionStore: GPS fact is not registered"
);

Next, given proofs and the root of the Standard Merkle Tree of results and tasks, we verify the proof within the root to check if it is included in the batch.

// Compute the Merkle leaf of the task
bytes32 taskCommitment = taskCommitments[i];
bytes32 taskMerkleLeaf = standardLeafHash(
    taskCommitment
);
// Ensure that the task is included in the batch by verifying the Merkle proof
bool isVerifiedTask = taskInclusionProof
.verify(
    scheduledTasksBatchMerkleRoot,
    taskMerkleLeaf
);

// Compute the Merkle leaf of the task result
bytes32 taskResultCommitment = keccak256(
    abi.encode(taskCommitment, computationalTaskResult)
);
bytes32 taskResultMerkleLeaf = standardLeafHash(
   taskResultCommitment
);
// Ensure that the task result is included in the batch by verifying the Merkle proof
bool isVerifiedResult = resultInclusionProof
.verify(
    batchResultsMerkleRoot,
    taskResultMerkleLeaf
);

After these two steps of validation, we can confirm that the targeted batched tasks are verified through all the Data Processor steps, and we can store the result on-chain. You can access the on-chain data in the on-chain mapping, which was computed and verified off-chain using a zk-proof.

// Store the task result
cachedTasksResult[taskCommitment] = TaskResult({
    status: TaskStatus.FINALIZED,
    result: computationalTaskResult
});

Using Data Processor Synchronously

Note: Turbo is under development. This section will be updated very soon.

Last updated