
Artificial Intelligence
Handling large-scale datasets efficiently is one of the biggest bottlenecks in modern machine learning workflows and when pre-training LLMs. As datasets grow in size and complexity, traditional methods like memmap arrays or PyTorch DataLoaders can struggle to keep up, especially in distributed training environments. In this post, we will look at some of the key features of Mosaic MDS, understand how to convert our existing data into this format, and walk through a few toy and real-world examples to get familiar with usage.
The Mosaic ML team has put a lot of work into making some solid documentation, so I’d recommend you give that a read here. Mosaic Data Sharding is a data loading library that was introduced in 2023 with this announcement. The Mosaic data environment is set up to make training in distributed environments with large datasets as painless as possible. Indeed, there are many common issues when dealing with large datasets in large environments, such as:
These are just some of the issues we have dealt with when pre-training large language models at scale, but I’m sure there are others I’m forgetting to mention.
Mosaic Data Streaming was set up to remedy the above issues with an easy-to-use codebase that plays closely with the standard PyTorch dataloader syntax to make it extra easy use. The chart below shows a nice illustration of what can happen when changing the number of GPUs during a train cycle, which, with standard data loaders, can create nondeterminism. With the Mosaic DS package, however, we can see that the loss curve looks exactly the same regardless of the GPUs.
I won’t dwell too much on the benefits of the library here; there are plenty of resources out there that do so. For more information, check out the links provided above. For now, let's get into how to actually implement this code with some simple examples.
Let’s start out getting familiar with the MDS library by converting a single file to MDS format. We will use a code snippet taken from the MDS repo to start:
import numpy as np
from PIL import Image
from streaming import MDSWriter
# Local or remote directory in which to store the compressed output files
data_dir = './MDS_Blog/'
# A dictionary mapping input fields to their data types
columns = {
'image': 'jpeg',
'class': 'int'
}
# Shard compression, if any
compression = 'zstd'
# Save the samples as shards using MDSWriter
with MDSWriter(out=data_dir, columns=columns, compression=compression) as out:
for i in range(100):
sample = {
'image': Image.fromarray(np.random.randint(0, 256, (32, 32, 3), np.uint8)),
'class': np.random.randint(10),
}
out.write(sample)
The above creates a simple random noise image and saves it to a local directory called “MDS_Blog”. The output will look like this:
Go ahead and open up the index.json file. You'll see some metadata related to the files, including the data type expected in each column.
Now we have some simple data created in MDS format. This can be transferred to S3 and then used in the Dataloader.
Let’s do something a little more complex, though. Let’s get into some dummy data for language modeling. If you’ve never seen the Nano-GPT repo, I’d highly recommend you give it a read. In this repo, a from scratch transformer model is coded and then trained on pre-tokenized data.
Let’s start off small, using a file containing concatenated works from Shakespeare located here. We will download this data, tokenize it with an open-source tokenizer, and then save it to two files containing validation data and training data.
import os
import requests
import tiktoken
import numpy as np
# download the tiny shakespeare dataset
input_file_path = os.path.join(os.path.dirname(__file__), 'input.txt')
if not os.path.exists(input_file_path):
data_url = 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'
with open(input_file_path, 'w', encoding='utf-8') as f:
f.write(requests.get(data_url).text)
with open(input_file_path, 'r', encoding='utf-8') as f:
data = f.read()
n = len(data)
train_data = data[:int(n*0.9)]
val_data = data[int(n*0.9):]
# encode with tiktoken gpt2 bpe
enc = tiktoken.get_encoding("gpt2")
train_ids = enc.encode_ordinary(train_data)
val_ids = enc.encode_ordinary(val_data)
print(f"train has {len(train_ids):,} tokens")
print(f"val has {len(val_ids):,} tokens")
# export to bin files
train_ids = np.array(train_ids, dtype=np.uint16)
val_ids = np.array(val_ids, dtype=np.uint16)
train_ids.tofile(os.path.join(os.path.dirname(__file__), 'train.bin'))
val_ids.tofile(os.path.join(os.path.dirname(__file__), 'val.bin'))
Okay, so now you should see two files, train.bin and val.bin, located local to wherever the code above ran. These files are in a special numpy binary data format; they can be easily read in the NanoGPT training loop as nummpy memmap arrays, so can easily load from local without eating up all your RAM, making them ideal for training models.
Now, let’s convert those train and val files we just made into MDS format for use with our DataLoader instances. We will use a slightly altered version of the code above:
def convert_bin_to_mds(data_file, out_root, block_size):
# Remove existing MDS directory if necessary
# data_file is the file (.bin in our case)
# out_root is a local directory to store the MDS outut
# block_size is our desired context length
rmtree(out_root, ignore_errors=True)
# Define columns based on your dataset format
columns = {
'uuid': 'str',
'x': 'ndarray',
'y': 'ndarray'
}
compression = 'zstd'
# Original memmap array
data = np.memmap(data_file, dtype=np.uint16, mode="r")
total_steps = len(data) - block_size
# Writer to MDS format
with MDSWriter(out=out_root, columns=columns, compression=compression) as out:
start_time = time.time() # Start timer
for idx in range(total_steps):
# Define a unique identifier and fetch sequences
uuid = str(uuid4())
x = data[idx: idx + block_size].astype(np.int64)
y = data[idx + 1: idx + 1 + block_size].astype(np.int64)
# Write each sample to MDS
sample = {
'uuid': uuid,
'x': x,
'y': y
}
out.write(sample)
# Print progress every 10,000 iterations
if idx % 10_000 == 0:
elapsed_time = time.time() - start_time
progress = (idx + 1) / total_steps
remaining_time = (elapsed_time / progress) - elapsed_time
print(f"Progress: {progress:.2%} | "
f"Elapsed Time: {elapsed_time:.2f}s | "
f"Estimated Remaining Time: {remaining_time:.2f}s")
You can run this function directly on the train.bin and val.bin files we just made, and you should see an output like:
Now, you can easily use the MDS files in a train loop. Please note that you could also directly use the .bin files in a train loop by defining a simple wrapper class that allows the dataloaders to interact directly with the binary files.
class ExtensionDataset(Dataset):
"""
Sampler based on Pytorch datasets for use
with Mosaic composer.
Args:
None
Returns:
"""
def __init__(self, data_file: str, block_size: int):
# Memory-mapped access to binary file
self.data = np.memmap(data_file, dtype=np.uint16, mode="r")
self.block_size = block_size
def __len__(self):
# Return the number of sequences in the dataset
return len(self.data) - self.block_size
def __getitem__(self, idx):
# Load a single sequence and its corresponding target sequence
x = torch.from_numpy(
self.data[idx : idx + self.block_size].astype(np.int64)
)
y = torch.from_numpy(
self.data[idx + 1 : idx + 1 + self.block_size].astype(np.int64)
)
return x, y
The above code block could be used in conjunction with the below snippet:
train_dataset = AxonDataset(data_file=os.path.join(config.data_dir, "train.bin"), block_size=config.model.block_size)
train_dataloader = DataLoader(train_dataset, batch_size=config.model.batch_size, shuffle=None)
So, we are able to use Mosaic Composer easily with either the binary files directly, or with the MDS data.
If we upload our new Shakespeare MDS data to S3 with something like:
def upload_to_s3(filepath: str, bucket_name: str, s3_file_key: str):
try:
s3 = boto3.client('s3')
# Check if the filepath is a directory
if os.path.isdir(filepath):
# Iterate through all files in the directory
for root, dirs, files in os.walk(filepath):
for file in files:
# Construct full local file path
local_file_path = os.path.join(root, file)
# Create the corresponding S3 file key by appending the relative path
relative_path = os.path.relpath(local_file_path, filepath)
s3_file_key_full = os.path.join(s3_file_key, relative_path)
# Upload the file
s3.upload_file(local_file_path, bucket_name, s3_file_key_full)
print(f"Uploaded {local_file_path} to s3://{bucket_name}/{s3_file_key_full}")
else:
# If it's a file, upload the single file
s3.upload_file(filepath, bucket_name, s3_file_key)
print(f"Uploaded {filepath} to s3://{bucket_name}/{s3_file_key}")
except Exception as e:
print(f"Unable to upload file or directory: {e}")
You should see your data in S3 now:
Okay great! So we’ve now completed two toy examples:
Toy examples are great and all, but what do we do when it’s finally time to work with a big dataset? Let’s move up to a larger dataset: Skylion007/openwebtext · Datasets at Hugging Face
This is also used in the NanoGPT repo, and we follow that here to allow you to continue to train an at scale replica of GPT2. Openwebtext is a close to real world replication of the dataset used to train GPT2, as the actual data was never open-sourced by “Open”AI. Anyways, let’s tokenize this in the same way we tokenized the smaller dataset; I’ll just point you to this file. Once you’ve run that, you should have locally stored train.bin and val.bin files that are much larger than the previous ones. This presents us with a unique challenge, how can we convert this into MDS in a reasonable amount of time and transfer it to S3?
Indeed, if you attempt to run the script in a linear loop you may just see the first proton decay. Clearly, we will need to parallelize this to some extent, both during conversion to MDS and during upload to S3
You can get some cheat codes from several of the dataset downloads from the Mosaic repo, for example here is the C4 dataset conversion. These examples differ from our situation, as they are converting un-tokenized text into MDS format. This means that you would need to tokenize this data during loading or at least prior to movement into your model. Notice how they use the PyTorch parallelization in conjunction with Huggingface datasets class. Also note how the Hugginface dataset class uses the streaming option; this is a bit of a double-edged sword, as it reduces the RAM/Disk storage requirements but is, in our experience, much slower than the non-streaming option. The non-streaming option creates large cache files (Apache arrow based) that can take up to 8x the storage size of the base dataset, so be careful (it’s gotten us before).
Okay, but let’s focus on the task at hand, converting our pre-tokenized binary files into MDS format in a time and compute efficient manner. We will use some parallel processing here:
def convert_bin_to_mds_multi_process(data_file, out_root, block_size, batch_size=512, num_workers=None):
"""Convert a binary file to MDS format using multiprocessing."""
rmtree(out_root, ignore_errors=True)
os.makedirs(out_root, exist_ok=True)
# Load the dataset
data = np.memmap(data_file, dtype=np.uint16, mode="r")
total_samples = len(data)
# Determine number of workers
if num_workers is None:
num_workers = cpu_count()
# Divide the dataset into chunks for each worker
chunk_size = total_samples // num_workers
chunks = [
(data[i * chunk_size: (i + 1) * chunk_size], i, out_root, block_size, 'zstd', batch_size)
for i in range(num_workers)
]
if total_samples % num_workers:
chunks.append((data[num_workers * chunk_size:], num_workers, out_root, block_size, 'zstd', batch_size))
# Use multiprocessing with progress tracking
with Manager() as manager:
progress_queue = manager.Queue()
pool = Pool(processes=num_workers)
# Start progress bar in the main process
progress_updater = pool.apply_async(update_progress_bar, (progress_queue, total_samples, block_size))
# Start worker processes
pool.starmap(process_chunk, [(chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], progress_queue) for chunk in chunks])
# Close the pool and wait for workers to finish
pool.close()
pool.join()
# Wait for the progress updater to finish
progress_updater.wait()
print("Conversion complete!")
With a process chunk function:
def process_chunk(chunk_data, chunk_idx, out_root, block_size, compression, batch_size, progress_queue):
"""Process a single chunk of the dataset and write it to MDS format."""
out_dir = os.path.join(out_root, f"shard_{chunk_idx}")
os.makedirs(out_dir, exist_ok=True)
columns = {
'uuid': 'str',
'x': 'ndarray',
'y': 'ndarray'
}
with MDSWriter(out=out_dir, columns=columns, compression=compression, size_limit=1 << 30) as out:
total_samples = len(chunk_data) - block_size
for start_idx in range(0, total_samples, batch_size):
batch = []
for idx in range(start_idx, min(start_idx + batch_size, total_samples)):
uuid = str(uuid4())
x = chunk_data[idx: idx + block_size].astype(np.int64)
y = chunk_data[idx + 1: idx + 1 + block_size].astype(np.int64)
sample = {
'uuid': uuid,
'x': x,
'y': y
}
batch.append(sample)
for sample in batch:
out.write(sample)
# Update progress queue
progress_queue.put(len(batch))
A brief explanation of what’s going on here:
After we complete this step, you will have a lot of subdirectories, each containing an index.json file and a bunch of MDS shards. The next crucial step in managing all this is to create a master index.json file. We can do this simply by running:
from streaming.base.util import merge_index
merge_index(out_root, keep_local=True)
This important command will take the index.json files from each subdirectory and merge them into one manager index.json file, so your final directory structure will look like:
Okay! Now you have workable data in MDS format, and your final task is to push this to S3. The function to push to S3 still needs to multiprocess for the transfer, but it’s fairly straightforward so I’ll save space and let you take a stab at it yourself.
You’ve now seen how to move data back and forth into MDS format and S3. MDS supports other major cloud providers as well, so if you have a different provider than AWS, it’s no problem. The MDS library provides many strong benefits to training large language models at scale, as our research team can attest to firsthand. Whether you are training on a few GPUs or hundreds, the library can support your tech stack and also many different data types. If you’re a multimodal crew, MDS supports easily mixing datasets together as well. We’ve only just scratched the surface of what the MDS library has to offer; hopefully this serves as a useful primer to get started with the library. If you would like clarification on any of the code used (or would like some of the helper functions we left out for brevity), don’t hesitate to reach out to our team for help. Until next time!
Aimpoint Digital is a market-leading analytics firm at the forefront of solving the most complex business and economic challenges through data and analytical technology. From integrating self-service analytics to implementing AI at scale and modernizing data infrastructure environments, Aimpoint Digital operates across transformative domains to improve the performance of organizations. Learn more by visiting: https://www.aimpointdigital.com
Whether you need advanced AI solutions, strategic data expertise, or tailored insights, our team is here to help.