No items found.

Managing LLM Pretraining data using Mosaic Data Sharding

Table of contents
Partner with
Aimpoint Digital
Meet an Expert

Handling large-scale datasets efficiently is one of the biggest bottlenecks in modern machine learning workflows and when pre-training LLMs. As datasets grow in size and complexity, traditional methods like memmap arrays or PyTorch DataLoaders can struggle to keep up, especially in distributed training environments. In this post, we will look at some of the key features of Mosaic MDS, understand how to convert our existing data into this format, and walk through a few toy and real-world examples to get familiar with usage.

What is MDS?

The Mosaic ML team has put a lot of work into making some solid documentation, so I’d recommend you give that a read here. Mosaic Data Sharding is a data loading library that was introduced in 2023 with this announcement. The Mosaic data environment is set up to make training in distributed environments with large datasets as painless as possible. Indeed, there are many common issues when dealing with large datasets in large environments, such as:

  • Data egress costs – moving data in and out of cloud storage naively can add up costs quickly
  • Determinism – keeping samples in the same order when changing the number of GPUs can yield different loss curves as data samples can be taken in non-deterministic order
  • Crash recovery – if a large job crashes, resuming training from the same spot can be time consuming as we must churn through our existing data loaders to get back to the data-point at which we stopped

These are just some of the issues we have dealt with when pre-training large language models at scale, but I’m sure there are others I’m forgetting to mention. 

Mosaic Data Streaming was set up to remedy the above issues with an easy-to-use codebase that plays closely with the standard PyTorch dataloader syntax to make it extra easy use. The chart below shows a nice illustration of what can happen when changing the number of GPUs during a train cycle, which, with standard data loaders, can create nondeterminism. With the Mosaic DS package, however, we can see that the loss curve looks exactly the same regardless of the GPUs. 

A graph of data lossDescription automatically generated with medium confidence
Taken from https://www.databricks.com/blog/mosaicml-streamingdataset

I won’t dwell too much on the benefits of the library here; there are plenty of resources out there that do so. For more information, check out the links provided above. For now, let's get into how to actually implement this code with some simple examples. 

Converting a Single File to MDS

Let’s start out getting familiar with the MDS library by converting a single file to MDS format. We will use a code snippet taken from the MDS repo to start:

import numpy as np
from PIL import Image
from streaming import MDSWriter

# Local or remote directory in which to store the compressed output files
data_dir = './MDS_Blog/'

# A dictionary mapping input fields to their data types
columns = {
    'image': 'jpeg',
    'class': 'int'
}

# Shard compression, if any
compression = 'zstd'

# Save the samples as shards using MDSWriter
with MDSWriter(out=data_dir, columns=columns, compression=compression) as out:
    for i in range(100):
        sample = {
            'image': Image.fromarray(np.random.randint(0, 256, (32, 32, 3), np.uint8)),
            'class': np.random.randint(10),
        }
        out.write(sample)
        

The above creates a simple random noise image and saves it to a local directory called “MDS_Blog”. The output will look like this:

Go ahead and open up the index.json file. You'll see some metadata related to the files, including the data type expected in each column.

Now we have some simple data created in MDS format. This can be transferred to S3 and then used in the Dataloader. 

Let’s do something a little more complex, though. Let’s get into some dummy data for language modeling. If you’ve never seen the Nano-GPT repo, I’d highly recommend you give it a read. In this repo, a from scratch transformer model is coded and then trained on pre-tokenized data. 

Let’s start off small, using a file containing concatenated works from Shakespeare located here. We will download this data, tokenize it with an open-source tokenizer, and then save it to two files containing validation data and training data.

import os
import requests
import tiktoken
import numpy as np

# download the tiny shakespeare dataset
input_file_path = os.path.join(os.path.dirname(__file__), 'input.txt')
if not os.path.exists(input_file_path):
    data_url = 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'
    with open(input_file_path, 'w', encoding='utf-8') as f:
        f.write(requests.get(data_url).text)

with open(input_file_path, 'r', encoding='utf-8') as f:
    data = f.read()
n = len(data)
train_data = data[:int(n*0.9)]
val_data = data[int(n*0.9):]

# encode with tiktoken gpt2 bpe
enc = tiktoken.get_encoding("gpt2")
train_ids = enc.encode_ordinary(train_data)
val_ids = enc.encode_ordinary(val_data)
print(f"train has {len(train_ids):,} tokens")
print(f"val has {len(val_ids):,} tokens")

# export to bin files
train_ids = np.array(train_ids, dtype=np.uint16)
val_ids = np.array(val_ids, dtype=np.uint16)
train_ids.tofile(os.path.join(os.path.dirname(__file__), 'train.bin'))
val_ids.tofile(os.path.join(os.path.dirname(__file__), 'val.bin'))

Okay, so now you should see two files, train.bin and val.bin, located local to wherever the code above ran. These files are in a special numpy binary data format; they can be easily read in the NanoGPT training loop as nummpy memmap arrays, so can easily load from local without eating up all your RAM, making them ideal for training models. 

Now, let’s convert those train and val files we just made into MDS format for use with our DataLoader instances. We will use a slightly altered version of the code above:

def convert_bin_to_mds(data_file, out_root, block_size):
    # Remove existing MDS directory if necessary
    # data_file is the file (.bin in our case)
    # out_root is a local directory to store the MDS outut
    # block_size is our desired context length
    rmtree(out_root, ignore_errors=True)
    
    # Define columns based on your dataset format
    columns = {
        'uuid': 'str',
        'x': 'ndarray',
        'y': 'ndarray'
    }
    
    compression = 'zstd'
    
    # Original memmap array
    data = np.memmap(data_file, dtype=np.uint16, mode="r")
    total_steps = len(data) - block_size
    
    # Writer to MDS format
    with MDSWriter(out=out_root, columns=columns, compression=compression) as out:
        start_time = time.time()  # Start timer
        
        for idx in range(total_steps):
            # Define a unique identifier and fetch sequences
            uuid = str(uuid4())
            x = data[idx: idx + block_size].astype(np.int64)
            y = data[idx + 1: idx + 1 + block_size].astype(np.int64)
            
            # Write each sample to MDS
            sample = {
                'uuid': uuid,
                'x': x, 
                'y': y   
            }
            out.write(sample)
            
            # Print progress every 10,000 iterations
            if idx % 10_000 == 0:
                elapsed_time = time.time() - start_time
                progress = (idx + 1) / total_steps
                remaining_time = (elapsed_time / progress) - elapsed_time
                
                print(f"Progress: {progress:.2%} | "
                      f"Elapsed Time: {elapsed_time:.2f}s | "
                      f"Estimated Remaining Time: {remaining_time:.2f}s")

You can run this function directly on the train.bin and val.bin files we just made, and you should see an output like:

A screenshot of a computerDescription automatically generated

Now, you can easily use the MDS files in a train loop. Please note that you could also directly use the .bin files in a train loop by defining a simple wrapper class that allows the dataloaders to interact directly with the binary files. 

class ExtensionDataset(Dataset):
    """
    Sampler based on Pytorch datasets for use 
    with Mosaic composer.

    Args:
        None

    Returns:
        
    """
    def __init__(self, data_file: str, block_size: int):
        # Memory-mapped access to binary file
        self.data = np.memmap(data_file, dtype=np.uint16, mode="r")
        self.block_size = block_size

    def __len__(self):
        # Return the number of sequences in the dataset
        return len(self.data) - self.block_size

    def __getitem__(self, idx):
        # Load a single sequence and its corresponding target sequence
        x = torch.from_numpy(
            self.data[idx : idx + self.block_size].astype(np.int64)
            )
        y = torch.from_numpy(
            self.data[idx + 1 : idx + 1 + self.block_size].astype(np.int64)
            )
        return x, y

The above code block could be used in conjunction with the below snippet:

train_dataset = AxonDataset(data_file=os.path.join(config.data_dir, "train.bin"), block_size=config.model.block_size)

train_dataloader = DataLoader(train_dataset, batch_size=config.model.batch_size, shuffle=None)

So, we are able to use Mosaic Composer easily with either the binary files directly, or with the MDS data. 

If we upload our new Shakespeare MDS data to S3 with something like:

def upload_to_s3(filepath: str, bucket_name: str, s3_file_key: str):
    try:
        s3 = boto3.client('s3')
        
        # Check if the filepath is a directory
        if os.path.isdir(filepath):
            # Iterate through all files in the directory
            for root, dirs, files in os.walk(filepath):
                for file in files:
                    # Construct full local file path
                    local_file_path = os.path.join(root, file)
                    
                    # Create the corresponding S3 file key by appending the relative path
                    relative_path = os.path.relpath(local_file_path, filepath)
                    s3_file_key_full = os.path.join(s3_file_key, relative_path)
                    
                    # Upload the file
                    s3.upload_file(local_file_path, bucket_name, s3_file_key_full)
                    print(f"Uploaded {local_file_path} to s3://{bucket_name}/{s3_file_key_full}")
        else:
            # If it's a file, upload the single file
            s3.upload_file(filepath, bucket_name, s3_file_key)
            print(f"Uploaded {filepath} to s3://{bucket_name}/{s3_file_key}")
    except Exception as e:
        print(f"Unable to upload file or directory: {e}")

You should see your data in S3 now:

A screenshot of a computerDescription automatically generated

Okay great! So we’ve now completed two toy examples:

  • We created a simple random image dataset in MDS format
  • We created a pre-tokenized dataset using some open-source text data, converted that to MDS format, and pushed to S3

Cut to the chase

Toy examples are great and all, but what do we do when it’s finally time to work with a big dataset? Let’s move up to a larger dataset: Skylion007/openwebtext · Datasets at Hugging Face

This is also used in the NanoGPT repo, and we follow that here to allow you to continue to train an at scale replica of GPT2. Openwebtext is a close to real world replication of the dataset used to train GPT2, as the actual data was never open-sourced by “Open”AI. Anyways,  let’s tokenize this in the same way we tokenized the smaller dataset; I’ll just point you to this file. Once you’ve run that, you should have locally stored train.bin and val.bin files that are much larger than the previous ones. This presents us with a unique challenge, how can we convert this into MDS in a reasonable amount of time and transfer it to S3?

Indeed, if you attempt to run the script in a linear loop you may just see the first proton decay. Clearly, we will need to parallelize this to some extent, both during conversion to MDS and during upload to S3

NOTE

You can get some cheat codes from several of the dataset downloads from the Mosaic repo, for example here is the C4 dataset conversion. These examples differ from our situation, as they are converting un-tokenized text into MDS format. This means that you would need to tokenize this data during loading or at least prior to movement into your model. Notice how they use the PyTorch parallelization in conjunction with Huggingface datasets class. Also note how the Hugginface dataset class uses the streaming option; this is a bit of a double-edged sword, as it reduces the RAM/Disk storage requirements but is, in our experience, much slower than the non-streaming option. The non-streaming option creates large cache files (Apache arrow based) that can take up to 8x the storage size of the base dataset, so be careful (it’s gotten us before).

Okay, but let’s focus on the task at hand, converting our pre-tokenized binary files into MDS format in a time and compute efficient manner. We will use some parallel processing here:

def convert_bin_to_mds_multi_process(data_file, out_root, block_size, batch_size=512, num_workers=None):
    """Convert a binary file to MDS format using multiprocessing."""
    rmtree(out_root, ignore_errors=True)
    os.makedirs(out_root, exist_ok=True)

    # Load the dataset
    data = np.memmap(data_file, dtype=np.uint16, mode="r")
    total_samples = len(data)

    # Determine number of workers
    if num_workers is None:
        num_workers = cpu_count()

    # Divide the dataset into chunks for each worker
    chunk_size = total_samples // num_workers
    chunks = [
        (data[i * chunk_size: (i + 1) * chunk_size], i, out_root, block_size, 'zstd', batch_size)
        for i in range(num_workers)
    ]

    if total_samples % num_workers:
        chunks.append((data[num_workers * chunk_size:], num_workers, out_root, block_size, 'zstd', batch_size))

    # Use multiprocessing with progress tracking
    with Manager() as manager:
        progress_queue = manager.Queue()
        pool = Pool(processes=num_workers)

        # Start progress bar in the main process
        progress_updater = pool.apply_async(update_progress_bar, (progress_queue, total_samples, block_size))

        # Start worker processes
        pool.starmap(process_chunk, [(chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], progress_queue) for chunk in chunks])

        # Close the pool and wait for workers to finish
        pool.close()
        pool.join()

        # Wait for the progress updater to finish
        progress_updater.wait()

    print("Conversion complete!")

With a process chunk function:

def process_chunk(chunk_data, chunk_idx, out_root, block_size, compression, batch_size, progress_queue):
    """Process a single chunk of the dataset and write it to MDS format."""
    out_dir = os.path.join(out_root, f"shard_{chunk_idx}")
    os.makedirs(out_dir, exist_ok=True)

    columns = {
        'uuid': 'str',
        'x': 'ndarray',
        'y': 'ndarray'
    }

    with MDSWriter(out=out_dir, columns=columns, compression=compression, size_limit=1 << 30) as out:
        total_samples = len(chunk_data) - block_size
        for start_idx in range(0, total_samples, batch_size):
            batch = []
            for idx in range(start_idx, min(start_idx + batch_size, total_samples)):
                uuid = str(uuid4())
                x = chunk_data[idx: idx + block_size].astype(np.int64)
                y = chunk_data[idx + 1: idx + 1 + block_size].astype(np.int64)
                sample = {
                    'uuid': uuid,
                    'x': x,
                    'y': y
                }
                batch.append(sample)

            for sample in batch:
                out.write(sample)

            # Update progress queue
            progress_queue.put(len(batch))

A brief explanation of what’s going on here:

  • We load our .bin memmap file
  • Determine the total number of samples in the dataset by index count
  • Determine the number of CPU cores we have available
  • Split the binary file amongst CPU cores
  • Within each core, we use the process_chunk function to create autoregressive samples and encode them into MDS format

After we complete this step, you will have a lot of subdirectories, each containing an index.json file and a bunch of MDS shards. The next crucial step in managing all this is to create a master index.json file. We can do this simply by running:

from streaming.base.util import merge_index
merge_index(out_root, keep_local=True)

This important command will take the index.json files from each subdirectory and merge them into one manager index.json file, so your final directory structure will look like:

A computer screen shot of a codeDescription automatically generated
Example directory structure with merged index.json files

Okay! Now you have workable data in MDS format, and your final task is to push this to S3. The function to push to S3 still needs to multiprocess for the transfer, but it’s fairly straightforward so I’ll save space and let you take a stab at it yourself. 

Conclusion

You’ve now seen how to move data back and forth into MDS format and S3. MDS supports other major cloud providers as well, so if you have a different provider than AWS, it’s no problem. The MDS library provides many strong benefits to training large language models at scale, as our research team can attest to firsthand. Whether you are training on a few GPUs or hundreds, the library can support your tech stack and also many different data types. If you’re a multimodal crew, MDS supports easily mixing datasets together as well. We’ve only just scratched the surface of what the MDS library has to offer; hopefully this serves as a useful primer to get started with the library. If you would like clarification on any of the code used (or would like some of the helper functions we left out for brevity), don’t hesitate to reach out to our team for help. Until next time!

About Aimpoint Digital

Aimpoint Digital is a market-leading analytics firm at the forefront of solving the most complex business and economic challenges through data and analytical technology. From integrating self-service analytics to implementing AI at scale and modernizing data infrastructure environments, Aimpoint Digital operates across transformative domains to improve the performance of organizations. Learn more by visiting: https://www.aimpointdigital.com

Author
Aaron McClendon
Aaron McClendon
Head of AI
Read Bio

Let’s talk data.
We’ll bring the solutions.

Whether you need advanced AI solutions, strategic data expertise, or tailored insights, our team is here to help.

Meet an Expert