Mastering global data products for efficient data pipelines in Mage AI

September 19, 2024 · 10 minute read

Cole Freeman

TLDR

The article introduces global data products as a solution for managing and reusing data outputs across multiple data pipelines efficiently. A global data product is any data generated within a pipeline that is registered under a unique identifier and made accessible throughout the entire project. This approach promotes reusability by allowing different pipelines to access the same data without redundant computations, enhancing resource utilization and ensuring consistency across data outputs. By leveraging global data products, organizations can streamline their data workflows, reduce computational overhead, and maintain up-to-date, consistent data for analysis and decision-making.

Outline

  • Introduction

  • What are global data products?

  • Reusability across pipelines

  • Efficient resource utilization

  • Lazy triggering mechanism

  • Configuring global data products

    • Step 1: Create product purchase data

    • Step 2: Registration process

    • Step 3. Set outdated after values

    • Step 4. Set outdated starting at values

    • Step 5. Determine partitions for block data to output

  • Integrating global data products into pipelines

  • Overriding global settings

  • Benefits of using global data products

  • Conclusion

Introduction

In today's data-driven world, efficient data management and processing are crucial for businesses to make informed decisions. Data pipelines play a vital role in extracting, transforming, and loading data from various sources to destinations where it can be analyzed. However, as data pipelines become more complex, managing and orchestrating the final outputs—also known as data products—can be challenging. Without efficient orchestration, businesses may face issues such as redundant computations, inconsistencies in data, and a drain on resources. This is where global data products come into play, offering a streamlined approach to generate, manage, and reuse data outputs across multiple pipelines.

What are global data products?

A

data product

is any piece of data generated by one or more components within a data pipeline. This could be anything from an in-memory DataFrame, a JSON structure, to a table in a database. Essentially, it's the end result of your data processing steps that is ready for consumption.

A

global data product

elevates this concept by making the data product accessible across the entire project. It is registered under a unique identifier (UUID) and references an existing pipeline. This global accessibility means that any pipeline within the project can reference and utilize the data product without needing to regenerate it.

One of the main advantages of global data products is their reusability. In complex projects, multiple pipelines might require the same data output. Without global data products, each pipeline would need to generate the data independently, leading to redundant computations and increased processing time.

Example scenario:

Imagine you have a computationally intensive pipeline named

users_ltv

that calculates the lifetime value (LTV) of users. Two other pipelines depend on this LTV data. By registering

users_ltv

as a global data product, both pipelines can access the LTV data without triggering separate runs of the

users_ltv

pipeline. This not only saves computational resources but also ensures consistency across your data outputs.

Global data products are designed to minimize unnecessary runs. They come with configurable settings that determine when the data product is considered outdated and needs regeneration. If the data is still fresh and valid, the system will avoid re-running the pipeline, thereby conserving computational resources.

The lazy triggering feature ensures that a global data product is only generated when required. If no pipeline or block depends on it, it remains idle. However, once a downstream process requests the data, the global data product checks its validity. If it's outdated, it triggers a regeneration; if not, it serves the existing data.

Configuring global data products

To harness the power of global data products, understanding how to configure them is essential. Here are the key settings and how they influence the behavior of your data products.

Use the code below to create some fake product purchase data that will be used for your global data product

  1. Create a new pipeline in Mage by navigating to the pipelines page and clicking the “+ New” button

  2. Give the pipeline a name

  3. Create a new data_loader block by clicking “All blocks” and then selecting the “Base template (generic)” data_loader template

  4. Enter the code below into the block and run it

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import io
import pandas as pd
import requests
from faker import Faker
import random
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@data_loader
def load_data(*args, **kwargs):
    # Initialize Faker
    fake = Faker()
    # Define the number of records to generate
    num_records = 100
    # Sample data for products
    products = [
        {'product_id': 'P2001', 'product_name': 'Wireless Mouse', 'unit_price': 25.00},
        {'product_id': 'P2002', 'product_name': 'Mechanical Keyboard', 'unit_price': 80.00},
        {'product_id': 'P2003', 'product_name': '27" Monitor', 'unit_price': 300.00},
        {'product_id': 'P2004', 'product_name': 'USB-C Hub', 'unit_price': 20.00},
        {'product_id': 'P2005', 'product_name': 'External Hard Drive', 'unit_price': 100.00}
    ]
    # Prepare empty lists to store data
    transaction_ids = []
    timestamps = []
    customer_ids = []
    product_ids = []
    product_names = []
    quantities = []
    unit_prices = []
    total_prices = []
    # Generate synthetic data
    for i in range(1, num_records + 1):
        transaction_ids.append(f"T{i:05d}")
        timestamp = fake.date_time_between(start_date='-1y', end_date='now')
        timestamps.append(timestamp)
        customer_ids.append(f"C{random.randint(1000, 9999)}")
        product = random.choice(products)
        product_ids.append(product['product_id'])
        product_names.append(product['product_name'])
        quantity = random.randint(1, 5)
        quantities.append(quantity)
        unit_price = product['unit_price']
        unit_prices.append(unit_price)
        total_price = unit_price * quantity
        total_prices.append(total_price)
    # Create a DataFrame
    data = {
        'Transaction ID': transaction_ids, 'Timestamp': timestamps, 'Customer ID': customer_ids,
        'Product ID': product_ids, 'Product Name': product_names, 'Quantity': quantities, 'Unit Price': unit_prices,
        'Total Price': total_prices
    }
    df = pd.DataFrame(data)
    # Sort the DataFrame by Timestamp
    df = df.sort_values('Timestamp').reset_index(drop=True)

    return df

Before anything else, you need to have at least one pipeline created. To register a global data product:

  • Navigate back to the Pipelines page and then select “Global data products” from the left popout menu

  • Click on "New global data product."

  • Provide a unique UUID for the data product.

  • Select the object type. Currently, pipelines are supported, but block object types are on the roadmap.

  • Choose the pipeline you wish to register.

  • click the “Create global data product” button.

This process effectively registers your pipeline's output as a globally accessible data product.

This setting determines the lifespan of your data product's validity. You can specify a duration—seconds, minutes, hours, days, weeks, months, or years—after which the data product is considered outdated since its last successful run.

Use Case:

If your

users_ltv

data becomes less accurate after 2 minutes due to near real time transaction updates, you can set the "Outdated after" parameter to 120 seconds. This ensures that any requests for the LTV data after this period will trigger a regeneration of the data product.

Sometimes, you might want the data product to refresh at specific times, regardless of when it last ran. The "Outdated starting at" setting allows you to specify exact times—down to the minute—when the data product should be considered outdated.

Use case:

Suppose you want the LTV calculations to be refreshed every day at Midnight when all daily transactions are expected to be recorded. You can set the parameter “Hour of day” to 0 . Even if the data product hasn't reached its "Outdated after" threshold, it will still be considered outdated at this specific time.

This setting allows you to specify which blocks within your pipeline provide the data for the global data product. You can also control how many partitions of data to return when the data product is requested.

  • Most recent partition:

    Setting Partitions value to 1 will return the most recent partition.

  • Multiple partitions:

    Specify a number (e.g., 5) to get data from the five most recent partitions.

  • All partitions:

    Setting the value to 0 returns data from all available partitions.

Integrating global data products into pipelines

After setting up your global data product, integrating it into other pipelines is straightforward.

Step 1

: Add a new pipeline to your project.

Step 2

: In your target pipeline, add a new “Global data product” block.

Step 3:

Select the desired global data product from the list. Once selected the global data product will be part of your pipeline and can interact with other blocks.

Step 4:

Add a data exporter to the pipeline

Step 5

: Create a trigger by navigating to the triggers page using the left popout menu

Step 6

: Click the “Run@once” button to create a new single run trigger

Step 7

: Click the “Run now” button on the Run pipeline now popup to run the trigger

By completing these steps, you've successfully integrated the global data product into your new pipeline and executed it. This integration demonstrates how global data products can streamline your data workflows by allowing easy reuse and management of data outputs across multiple pipelines. With this setup, you can enhance efficiency, maintain consistency, and optimize your data operations within your project.

Benefits of using global data products

By centralizing the generation of critical data products, you ensure that all pipelines consume the same version of the data. This consistency is vital for accurate reporting and analysis.

Avoiding redundant data processing saves computational resources and reduces costs. Especially for resource-intensive pipelines, this optimization can lead to significant efficiency gains.

Global data products simplify the orchestration of complex data pipelines. You can easily track dependencies and understand how data flows through different parts of your project.

Conclusion

Global data products offer a powerful way to optimize data pipelines by promoting reusability, efficiency, and consistency. By understanding and leveraging their features, you can streamline your data workflows, reduce computational overhead, and ensure that your data consumers always have access to the most accurate and up-to-date information.

Whether you're dealing with complex data transformations or simply looking to improve your data pipeline management, integrating global data products into your strategy is a step toward more efficient and effective data operations.

Are you ready to revolutionize your data pipelines and elevate your data operations? Check out

Mage Pro

where you get all the benefits of Mage Open Source including global data products and managed cloud infrastructure.

Start building for free

No need for a credit card to get started.
Trying out Mage to build ranking models won’t cost a cent.

No need for a credit card to get started. Trying out Mage to build ranking models won’t cost a cent.

 2024 Mage Technologies, Inc.
 2024 Mage Technologies, Inc.