Refactoring yt Frontends - Part 1

In the still-in-development version of yt (4.0), the way that particles are handled has been redesigned from the ground up.

The current version of yt (3.x) utilizes an octree-based approach for meshing the particles, although not for indexing them – which presents some problems when doing subsets of particles, as well as when doing visualizations that rely on an implicit meshing. The main result is that, in general, particle visualizations in yt 3.x aren’t that great, and are underresolved.

In yt 4.0, the particle system has been reimplemented to use EWAH bitmap indices (for more info, see Daniel Lemire’s EWAHBoolArray repository) to track which “regions” of files correspond to particular spatial regions, as designated by indices in a space-filling curve. Things are now orders of magnitude faster to load, to subset, and to visualize – and the memory overhead is so much lower!

This work was led by Nathan Goldbaum and Meagan Lang, with crucial contributions from the rest of the yt community, including feedback and bugfixes from Bili Dong and Cameron Hummels.

Recently, I’ve been exploring using a different array backend in yt, right now focusing on dask. While yt does lots of MPI-parallel operations, much of what we do with these has to be hand-programmed – so when you implement a new DerivedQuantity (i.e., stuff like calling min on a data object) you have to jump through a few hoops related to intermediate values and the like. Plus, dask seems to be everywhere, and so if we exported to dask arrays or somehow interoperated better with it, we’d be able to interoperate with lots of the rest of the ecosystem more easily.

Unfortunately, there’s a bit of an impedance mismatch which … has made this more difficult than I’d like.

Reading Data

Before getting too much further, though, I’m going to go through a bit about how yt thinks about “chunking” data.

The fundamental thing that yt does is index data. (Well, that, and take a while to compile all the Cython code.) Processing of the data is all layered on top of that – including some pretty cool semantics-of-data and units, visualization, etc. The main thing is that if you do a subset, it knows where to go to grab that subset of data, and if you want to do something that touches everything, it’ll do its best to reduce the number of times data is loaded off disk in service of that.

We do this with a “ chunking” system, which is implemented differently if your data is discrete (i.e., particles), mesh-based, and so on.

So to show what the problem is, I’m going to load up a dataset from the FIRE project.

import yt
ds = yt.load("data/FIRE_M12i_ref11/snapshot_600.hdf5")
yt : [INFO     ] 2019-06-02 16:02:22,303 Calculating time from 1.000e+00 to be 4.355e+17 seconds
yt : [INFO     ] 2019-06-02 16:02:22,304 Assuming length units are in kpc/h (comoving)
yt : [INFO     ] 2019-06-02 16:02:22,337 Parameters: current_time              = 4.3545571088051386e+17 s
yt : [INFO     ] 2019-06-02 16:02:22,338 Parameters: domain_dimensions         = [1 1 1]
yt : [INFO     ] 2019-06-02 16:02:22,339 Parameters: domain_left_edge          = [0. 0. 0.]
yt : [INFO     ] 2019-06-02 16:02:22,341 Parameters: domain_right_edge         = [60000. 60000. 60000.]
yt : [INFO     ] 2019-06-02 16:02:22,342 Parameters: cosmological_simulation   = 1
yt : [INFO     ] 2019-06-02 16:02:22,343 Parameters: current_redshift          = 0.0
yt : [INFO     ] 2019-06-02 16:02:22,344 Parameters: omega_lambda              = 0.728
yt : [INFO     ] 2019-06-02 16:02:22,344 Parameters: omega_matter              = 0.272
yt : [INFO     ] 2019-06-02 16:02:22,345 Parameters: omega_radiation           = 0.0
yt : [INFO     ] 2019-06-02 16:02:22,347 Parameters: hubble_constant           = 0.702

At this point yt has done a tiny little bit of reading of the data – just enough to figure out some of the metadata. It hasn’t indexed anything yet or read any of the actual data fields off of disk.

Now let’s make a plot of the gas density, integrated over the z axis of the simulation. Keep in mind that in doing this, it will have to read all the gas particles and smooth them onto a buffer. The first time this gets run, an index is generated and then stored to disk. More on that in a moment.

I’m going to use ds.r[:] here for “dataset region, but the whole thing” and then I call integrate on it and specify the field to integrate. Then, I plot it.

p=ds.r[:].integrate("density", axis="z").plot(("gas", "density"))
yt : [INFO     ] 2019-06-02 16:02:22,484 Allocating for 4.787e+06 particles
Loading particle index: 100%|██████████| 10/10 [00:00<00:00, 817.25it/s]
yt : [INFO     ] 2019-06-02 16:02:23,623 xlim = 0.000000 60000.000000
yt : [INFO     ] 2019-06-02 16:02:23,623 ylim = 0.000000 60000.000000
yt : [INFO     ] 2019-06-02 16:02:23,633 Making a fixed resolution buffer of (('gas', 'density')) 800 by 800


(All that empty space is because there are only gas particles in the middle of the dataset!)

The first time any data needs to be read from a particle dataset, yt will construct an in-memory index of the data on disk; by default, it will store this in a sidecar file, so the next time that the dataset is read it does not need to be generated again.

The way the bitmap indices work is really fun, but that deserves its own blog post. It suffices to say that the indexing helps to figure out both which files to read, and which subsets of those files to read, since we don’t assume that the particles are sorted in any way. (Mostly because each code tends to sort the particles in its own way!)

Now, for projecting over the whole domain, it’s not that big a deal to read everything, since we have to anyway, but if we did a subset it could dramatically reduce the IO necessary, and it also keeps much less data resident in memory than the old implementation.

Continuing on, let’s say that we now want to center at a different location. We’d figure out the most dense point, and then set our center.

c = ds.r[:].argmax(("gas", "density"))

(One thing this next set of code highlights is that, in general, how we handle centers in yt is a bit clumsy at times. Writing this blog post led me to filing an issue which may or may not get any traction or support.)

p.set_origin("center-window")
p.set_center((c[0], c[1]))
p.zoom(25)
p.set_zlim(("gas","density"), 1e-6, 1e-3)
yt : [INFO     ] 2019-06-02 16:02:25,607 xlim = -713.911179 59286.088821
yt : [INFO     ] 2019-06-02 16:02:25,611 ylim = 1049.283652 61049.283652
yt : [INFO     ] 2019-06-02 16:02:25,619 Making a fixed resolution buffer of (('gas', 'density')) 800 by 800


So, we can visualize now, and it’s faster than it was before, and we also get much better results. Great. So why am I belaboring this point?

It’s because in the background, yt is queryin a data object to see which items to read off disk, then it is reading those items off disk. In this particular instance, it is doing what we call “io” chunking – this means to use whatever type of hinting is best to get the most efficient ordering it knows how. Among other things, yt will try to minimize the number of times it opens a file, it seeks in a file, and it tries to keep the memory allocation count as low as possible.
(I’ll write more on this last point later – much of what yt does to index in yt-3.x and yt-4.0 is designed to keep the number of allocated arrays in the IO routines as low as possible, and to avoid any expensive concatenation or subselection operations. It turns out, this is … not as big a deal as thought when this was made a design principle. And in general, it leads to a lot more floating point operations than we would like, and sometimes more stuff in memory, too.)

And, so, uh, “chunking” is…?

We can figure out how yt chunks this data by, well, asking it to do it manually! Every data object presents a chunks interface which is a generator that modifies its internal state and then yields itself. For instance:

dd = ds.all_data()
for chunk in dd.chunks([], "io"):
    print(chunk["particle_ones"].size)
1048576
885527
753678
524288
317696
262144
262144
262144
262144
208609

I mentioned that this generator yields itself; this is true. But the internal state is modified to store where we are in the iteration, along with things like the parameters for derived fields and the like. The source for this looks like this:

from yt.data_objects.data_containers import YTSelectionContainer
YTSelectionContainer.chunks??
Signature: YTSelectionContainer.chunks(self, fields, chunking_style, **kwargs)
Docstring: <no docstring>
Source:   
    def chunks(self, fields, chunking_style, **kwargs):
        # This is an iterator that will yield the necessary chunks.
        self.get_data() # Ensure we have built ourselves
        if fields is None: fields = []
        # chunk_ind can be supplied in the keyword arguments.  If it's a
        # scalar, that'll be the only chunk that gets returned; if it's a list,
        # those are the ones that will be.
        chunk_ind = kwargs.pop("chunk_ind", None)
        if chunk_ind is not None:
            chunk_ind = ensure_list(chunk_ind)
        for ci, chunk in enumerate(self.index._chunk(self, chunking_style,
                                   **kwargs)):
            if chunk_ind is not None and ci not in chunk_ind:
                continue
            with self._chunked_read(chunk):
                self.get_data(fields)
                # NOTE: we yield before releasing the context
                yield self
File:      ~/yt/yt/yt/data_objects/data_containers.py
Type:      function

Note that this relies on the index object providing the _chunk routine, which interprets the type of chunking. Also, _chunked_read is a context manager which looks like this:

YTSelectionContainer._chunked_read??
Signature: YTSelectionContainer._chunked_read(self, chunk)
Docstring: <no docstring>
Source:   
    @contextmanager
    def _chunked_read(self, chunk):
        # There are several items that need to be swapped out
        # field_data, size, shape
        obj_field_data = []
        if hasattr(chunk, 'objs'):
            for obj in chunk.objs:
                obj_field_data.append(obj.field_data)
                obj.field_data = YTFieldData()
        old_field_data, self.field_data = self.field_data, YTFieldData()
        old_chunk, self._current_chunk = self._current_chunk, chunk
        old_locked, self._locked = self._locked, False
        yield
        self.field_data = old_field_data
        self._current_chunk = old_chunk
        self._locked = old_locked
        if hasattr(chunk, 'objs'):
            for obj in chunk.objs:
                obj.field_data = obj_field_data.pop(0)
File:      ~/yt/yt/yt/data_objects/data_containers.py
Type:      function

This is a bit clunky, but it stores the old state (because, believe it or not, sometimes we have multiple levels of chunking simultaneously, especially for things like spatial derivatives) and then it makes a fresh state, and then it resets it after the context manager concludes.

So the end result here is that we have a mechanism that divides the dataset up into the chunks it needs (YTDataChunk objects), and then iterates over them. What does this look like for our particle dataset? Well, we can find out, evidently, by looking at the _current_chunk attribute on the object yielded by chunks.

I’ve changed what we print out here just a little bit, because I want to keep the output a bit more human readable, but this is what it looks like:

dd = ds.all_data()
for chunk in dd.chunks([], "io"):
    print("\nExamining chunk...")
    for obj in chunk._current_chunk.objs:
        print("    Examining obj...",)
        for data_file in obj.data_files:
            print("        {}: {}-{}".format(data_file.filename, data_file.start, data_file.end))
Examining chunk...
    Examining obj...
        /home/matthewturk/data/FIRE_M12i_ref11/snapshot_600.hdf5: 0-262144

Examining chunk...
    Examining obj...
        /home/matthewturk/data/FIRE_M12i_ref11/snapshot_600.hdf5: 262144-524288

Examining chunk...
    Examining obj...
        /home/matthewturk/data/FIRE_M12i_ref11/snapshot_600.hdf5: 524288-786432

Examining chunk...
    Examining obj...
        /home/matthewturk/data/FIRE_M12i_ref11/snapshot_600.hdf5: 786432-1048576

Examining chunk...
    Examining obj...
        /home/matthewturk/data/FIRE_M12i_ref11/snapshot_600.hdf5: 1048576-1310720

Examining chunk...
    Examining obj...
        /home/matthewturk/data/FIRE_M12i_ref11/snapshot_600.hdf5: 1310720-1572864

Examining chunk...
    Examining obj...
        /home/matthewturk/data/FIRE_M12i_ref11/snapshot_600.hdf5: 1572864-1835008

Examining chunk...
    Examining obj...
        /home/matthewturk/data/FIRE_M12i_ref11/snapshot_600.hdf5: 1835008-2097152

Examining chunk...
    Examining obj...
        /home/matthewturk/data/FIRE_M12i_ref11/snapshot_600.hdf5: 2097152-2359296

Examining chunk...
    Examining obj...
        /home/matthewturk/data/FIRE_M12i_ref11/snapshot_600.hdf5: 2359296-2567905

A few notes here. Each chunk is able to have multiple “objects” associated with it (which in grid frontends usually means multiple grid objects) but here, we have only one entry in the obj list associated with each. Each obj then only has one item in data_files, which is not really a data file, but instead a subset of a data file specified by its start and end indices.

If you’re thinking this is a bit clumsy, I would agree with you.

Dask Stuff

The issue that I wrote about at the start of this blog post shows up when we start looking at how these chunks are generated. In principle, this does not map that badly to how dask expect chunks to be emitted.

(At this point I need to admit that while I’ve worked with dask, it’s entirely possible that I am going to misrepresent its capabilities. Any errors are my own, and if I find out I am mistaken about any of this, I will happily update this blog post!)

It’s possible to create a dask array through the dask.array.Array constructor; this is described in the array design docs. Since yt uses unyt for attaching units we will need to do some additional work, but let’s imagine that we are simply happy dealing with unit-less (and, I suppose, unyt-less) arrays for now.

To generate these arrays most efficiently, we need to be able to specify their size, how to obtain them, and maybe a couple other things. But for our purposes, those are the two most important things.

Unfortunately, as you might be able to tell, this is not information that is super easily exposed without iterating over the dataset. Sure, if we iterated and read everything, of course we can show the appropriate info. And, I posted a little bit about how one might do this on issue 1891, but there’s a key thing going on in that code – yt has already read all the data from disk.

So, this isn’t ideal.

Chunks are not persistent

This all comes about because chunks are not persistent, and more specifically, chunks are always create on-demand. Each different data object will have its own set of chunks, and these will map differently. So, for instance, we might end up selecting all the same sets of objects, but they will have different sizes (and even each different field might be a different size).

sp1 = ds.sphere(c, (1, "Mpc"))
sp2 = ds.r[ (20.0, "Mpc") : (40.0, "Mpc"),
            (25.0, "Mpc") : (45.0, "Mpc"),
            (55.0, "Mpc") : (65.0, "Mpc") ]

print("sp1 len == {}\nsp2 len == {}".format(
    len(list(sp1.chunks([], "io"))),
    len(list(sp2.chunks([], "io")))
))


print("sp1 => ", " ".join(str(chunk["particle_ones"].size) for chunk in sp1.chunks([], "io")))
print("sp2 => ", " ".join(str(chunk["particle_ones"].size) for chunk in sp2.chunks([], "io")))
sp1 len == 10
sp2 len == 10
sp1 =>  388571 306586 341808 205880 50260 2 1 2 3 0
sp2 =>  12 3673 480 29 146 200 77 419 3697 400

The trickiest part of this is that in these cases, we don’t know how big each one is going to be! For other types of indexing, it’s slightly different – the indexing system for grids and octrees and meshes can figure out in advance (without reading data from disk) the precise number of values that will be read. But for particles we don’t necessarily know.

Unfortunately, even if we did, the way that the YTDataChunk objects are the result of creating, then yield-ing, rather than returning a list of objects with known sizes makes it harder to expose this to dask. In particular, because we can’t (inexpensively) fast-forward the generator or rewind it or even access it elementwise makes it tricky to interface. One can expose unknown chunk sizes to dask, but it seems like we could do better.

So what can be done?

Well, let me first note that a lot of this is a result of trying to be clever! Back when the chunking system was being implemented, it seemed like simple generator expressions were the right way to do it. And, a bunch of layers have been added on top of those generator expressions that make it harder to simply strip that component out.

But recently, Britton Smith and I have been digging into some of the particle frontends, and we think we might have a solution that would both simplify a lot of this logic and make it a lot easier to expose the arrays to different array backends – specifically dask.

For more on that, wait for part two!

Matthew Turk
Matthew Turk
Assistant Professor of Information Sciences

I am interested in the intersection of data analysis, visualization and open source in the sciences.