Source code for meep_adjoint.dft_cell

"""Mostly convenience wrappers around various objects in core pymeep.

   v3, V3 convert between mp.Vector3 and simple lists or np.arrays of coordinates

   Grid repackages 'array metadata' with some convenient redundancy

   Subregion replaces the 6 separate pymeep data structures used
   to describe grid subregions ('FluxRegion', 'FieldsRegion', etc.)

   DFTCell similarly replaces the 5 separate data structures used
   to describe sets of frequency-domain field components.
"""

import numpy as np
import meep as mp
import warnings
from collections import namedtuple

from . import get_adjoint_option as adj_opt

######################################################################
# general-purpose constants and utility routines
######################################################################
ORIGIN           = np.zeros(3)
XHAT, YHAT, ZHAT = [ np.array(a) for a in [[1.,0,0], [0,1.,0], [0,0,1.]] ]
E_CPTS           = [mp.Ex, mp.Ey, mp.Ez]
H_CPTS           = [mp.Hx, mp.Hy, mp.Hz]
EH_CPTS          = E_CPTS + H_CPTS
EH_TRANSVERSE    = [ [mp.Ey, mp.Ez, mp.Hy, mp.Hz],
                     [mp.Ez, mp.Ex, mp.Hz, mp.Hx],
                     [mp.Ex, mp.Ey, mp.Hx, mp.Hy] ]


def V3(a1=0.0, a2=0.0, a3=0.0):
    """Pack vector for passage to core meep routines."""
    v=v3(a1,a2,a3)
    return mp.Vector3(v[0],v[1],v[2])


def v3(a1=0.0, a2=0.0, a3=0.0):
    """Unpack vector returned by core meep routine."""
    if type(a1) in [list, np.array, np.ndarray]:
        return np.array([float(x) for x in (list(a1)+[0,0])[0:3] ])
    if isinstance(a1,mp.Vector3):
        return a1.__array__()
    return np.array([a1,a2,a3])

######################################################################
# fix a bug in libmeep
######################################################################
def fix_array_metadata(xyzw, center, size):
    """fixes for the perenially buggy get_array_metadata routine in core meep."""
    for d in range(0,3):
        if size[d]==0.0 and xyzw[d][0]!=center[d]:
#            warnings.warn('correcting for bug in get_array_metadata: {}={}-->{}'.format('xyz'[d],xyzw[d][0],center[d]))
            xyzw[d]=np.array([center[d]])
        else:
            xyzw[d]=np.array(xyzw[d])


######################################################################
# 'Grid' is a convenience extension of 'array metadata'
######################################################################
Grid = namedtuple('Grid', ['xtics', 'ytics', 'ztics', 'points', 'weights', 'shape'])

def make_grid(size, center=np.zeros(3), dims=None, length=None):
    """Construct a Grid for a rectangular subregion.

    Parameters
    ----------
    size : [list or numpy array]
        size of region
    center : [list or numpy array], optional
        center of region, by default np.zeros(3)
    dims : [list of integer], optional
        numbers of grid points in each dimension, by default None
    length : [float], optional
        discretization lengthscale, by default None

    Returns
    -------
    New instance of Grid.
    """

    nd = len(np.flatnonzero(size))
    center, size = np.array(center)[0:nd], np.array(size)[0:nd]
    if length is not None:
        dims = [max(1,int(np.ceil(s/length))) for s in size]
    if dims is None:
        dims = [(10 if s>0 else 1) for s in size]
    pmin, pmax = center-0.5*size, center+0.5*size
    tics = [np.linspace(a, b, n) for (a, b, n) in zip(pmin, pmax, dims)]
    if len(tics)==2:
        tics.append([0])
    points = [np.array([x, y, z]) for x in tics[0] for y in tics[1] for z in tics[2] ]
    vol, ntot = np.prod([s for s in size if s>0]), np.prod(dims)
    weights = (vol/ntot) * np.ones(ntot)
    shape = [len(t) for t in tics if len(t)>1]
    return Grid(tics[0], tics[1], tics[2], points, weights, shape)


def xyzw2grid(xyzw):
    """Construct Grid from lists of points and weights."""
    return Grid(xyzw[0], xyzw[1], xyzw[2],
                [mp.Vector3(x,y,z) for x in xyzw[0] for y in xyzw[1] for z in xyzw[2]],
                 xyzw[3].flatten(), xyzw[3].shape)


[docs]class Subregion(object): """Subregion of computational cell. A Subregion is a finite hyperrectangular region of space contained within the extents of the FDTD computational grid. A Subregion may be of codimension 1 (i.e. it is a line in 2D or a plane in 3D), in which case it has a normal direction. Codim-1 subregions are used to define eigenmode sources and to evaluate Poynting fluxes and eigenmode expansion coefficients. Alternatively, the subregion may be of codimension 0 [i.e. it is a full rectangle (2D) or box (3D)], or of dimension 0 [i.e. it is a point]. The normal direction is undefined in these cases. A Subregion has a name (str), which is user-assigned or autogenerated if left unspecified. Note ---- ``Subregion`` is a common replacement for ``FluxRegion``, ``EnergyRegion``, and the other similar data structures in :program:`libmeep`. Its advantages are that (a) it eliminates redundant code and cumbersome API conventions imposed by the coexistence of multiple distinct data structures all describing the same thing, and that (b) it adds a user-assignable name field, which would be convenient to have already in core :program:`pymeep` but is essential in ``meep_adjoint`` to allow objective quantities to be assigned names that identify the objective region for which they are defined. Parameters ---------- xmin : array-like, optional minimal corner, by default None xmax : array-like, optional maximal corner, by default None center : array-like, optional center, by default ``ORIGIN`` size : array-like, optional size, by default None normal : array-like, optional normal direction for codim-1 regions, by default None name : str, optional arbitrary caller-chosen label; autogenerated if not specified """ def __init__(self, xmin=None, xmax=None, center=ORIGIN, size=None, normal=None, dir=None, name=None): if (xmin is not None) and (xmax is not None): (self.xmin, self.xmax) = (v3(xmin), v3(xmax)) (self.center, self.size) = (0.5*(self.xmax+self.xmin)), ((self.xmax-self.xmin)) elif size is not None: (self.center, self.size) = (v3(center), v3(size)) self.xmin, self.xmax = [self.center + sgn*self.size for sgn in [-1,1] ] self.normal, self.name = (dir if normal is None else normal), name
dft_cell_names=[]
[docs]class DFTCell(object): """Simpler data structure for frequency-domain fields in MEEP The instantiating data of a DFTCell are a grid subregion, a set of field components, and a list of frequencies. These metadata fields remain constant throughout the lifetime of a DFTCell. In addition to the metadata, instances of DFTCell allocate internal arrays of DFT registers in which the specified components of the frequency-domain fields at the specified grid points and frequencies are accumulated over the course of a MEEP timestepping run. These registers are reset to zero at the beginning of the next timestepping run, but in the meantime you can use the ``save_fields`` method to save an internally cached snapshot of the fields computed on a given timestepping run. Note: for now, all arrays are stored in memory. For large calculations with many DFT frequencies it might make sense to implement a disk-caching scheme. The internally-stored frequency-domain fields at a single frequency may be fetched via the get_EH_slice (single component) or get_EH_slices (all components) methods. By default these routines return slices of the currently active fields (i.e. the fields computed on the most recent timestepping run), but they accept an optional parameter specifying the label of a data set stored by a call to ``save_fields`` after a previous timestepping run. For convenience, DFTCell also offers a get_eigenmode_slices() method that computes and returns eigenfield profiles in the same format as DFT fields. DFTCells also know how to crunch their internally-stored field-component data to compute various physical quantities of interest, such as Poynting fluxes and field energies. We consider this the primary functionality exported by DFTCell, and thus implement it as the __call__ method of the class. DFTCell replaces the DftFlux, DftFields, DftNear2Far, DftForce, DftLdos, and DftObj structures in core pymeep. Parameters ---------- region : Subregion Subregion of computational cell in which to compute frequency-domain field components. components : list of meep components (i.e. [mp.Ex, mp.Hy]), optional Field components to compute. fcen, df, nfreq: float, float, int Set of frequencies at which to compute FD fields. """ def __init__(self, region, components=None, fcen=None, df=None, nfreq=None): self.region = region self.normal = region.normal self.celltype = 'flux' if self.normal is not None else 'fields' self.components = components or (EH_TRANSVERSE[self.normal] if self.normal is not None else EH_CPTS) self.fcen = adj_opt('fcen') if fcen is None else fcen self.df = adj_opt('df') if df is None else df self.nfreq = adj_opt('nfreq') if nfreq is None else nfreq self.freqs = [self.fcen] if self.nfreq==1 else np.linspace(self.fcen-0.5*self.df, self.fcen+0.5*self.df, self.nfreq) self.sim = None # mp.simulation for current simulation self.dft_obj = None # meep DFT object for current simulation self.EH_cache = {} # cache of frequency-domain field data computed in previous simulations self.eigencache = {} # cache of eigenmode field data to avoid redundant recalculations global dft_cell_names if region.name is not None: self.name = '{}_{}'.format(region.name, self.celltype) else: self.name = '{}_{}'.format(self.celltype, len(dft_cell_names)) dft_cell_names.append(self.name) # Although the subgrid covered by the cell is independent of any # mp.simulation, at present we can't compute subgrid metadata # without first instantiating a mp.Simulation, so we have to # wait to initialize the 'grid' field of DFTCell. TODO make the # grid metadata calculation independent of any mp.Simulation or meep::fields # object; it only depends on the resolution and extents of the Yee grid # and thus logically belongs in `vec.cpp` or another code module that # exists independently of fields, structures, etc. self.grid = None
[docs] def register(self, sim): """ 'Register' the cell in a MEEP simulation to request computation of frequency-domain fields. Parameters ---------- sim : mp.Simulation """ self.sim = sim if self.celltype == 'flux': flux_region = mp.FluxRegion(V3(self.region.center),V3(self.region.size),direction=self.normal) self.dft_obj = sim.add_flux(self.fcen,self.df,self.nfreq,flux_region) else: self.dft_obj = sim.add_dft_fields(self.components, self.freqs[0], self.freqs[-1], self.nfreq, center=V3(self.region.center), size=V3(self.region.size)) # take this opportunity to initialize simulation-dependent fields if self.grid is None: xyzw=sim.get_array_metadata(center=V3(self.region.center), size=V3(self.region.size), collapse=True, snap=True) fix_array_metadata(xyzw, self.region.center, self.region.size) self.grid = xyzw2grid(xyzw)
###################################################################### ######################################################################
[docs] def get_EH_slice(self, c, nf=0): """Fetch array of frequency-domain amplitudes for a single field component. Compute an array of frequency-domain field amplitudes, i.e. a frequency-domain array slice, for a single field component at a single frequency in the current simulation. This is like mp.get_dft_array(), but 'zero-padded:' when the low-level DFT object does not have data for the requested component (perhaps because it vanishes identically by symmetry), this routine returns an array of the expected dimensions with all zero entries, instead of a rank-0 array that prints out as a single preposterously large or small floating-point number, which is the not-very-user-friendly behavior of mp.get_dft_array(). Parameters ---------- c : (mp.component) Field component to fetch. nf : int, optional Frequency index, by default 0 Returns ------- np.array Complex-valued array giving field amplitude at grid points. """ EH = self.sim.get_dft_array(self.dft_obj, c, nf) return EH if np.ndim(EH)>0 else 0.0j*np.zeros(self.grid.shape)
[docs] def get_EH_slices(self, label=None, nf=0): """Fetch arrays of frequency-domain field amplitudes for all stored components. Return a 1D array (list) of arrays of frequency-domain field amplitudes, one for each component in this DFTCell, at a single frequency in a single MEEP simulation. The simulation in question may be the present, ongoing simulation (if label==None), in which case the array slices are read directly from the currently active meep DFT object; or it may be a previous simulation (identified by label) for which DFTCell::save_fields(label) was called at the end of timestepping. Parameters ---------- label : [str], optional Label of saved data to retrieve. nf : int, optional Frequency index, by default 0 Returns ------- list of np.array Arrays of field-component amplitudes at grid points Raises ------ ValueError if no data exists for the specified label. """ if label is None: return [ self.get_EH_slice(c, nf=nf) for c in self.components ] elif label in self.EH_cache: return self.EH_cache[label][nf] raise ValueError("DFTCell {} has no saved data for label '{}'".format(self.name, label))
[docs] def subtract_incident_fields(self, EHT, nf=0): """Substract incident from total fields to yield scattered fields. Parameters ---------- EHT : list of arrays of field component amplitudes, as returned by get_EH_slices, for the **total** fields nf : int, optional frequency index, by default 0 """ EHI = self.get_EH_slices(label='incident', nf=nf) for nc, c in enumerate(self.components): EHT[nc] -= EHI[nc]
[docs] def save_fields(self, label): """ Save current values of grid fields internally for later use. This routine tells the DFTCell to create and save an archive of the frequency-domain array slices for the present simulation---i.e. to copy the frequency-domain field data out of the sim.dft_obj structure and into an appropriate data buffer in the DFTCell, before the sim.dft_obj data vanish when sim is reset for the next run. This routine should be called after timestepping is complete. The given label is used to identify the stored data for future retrieval. Parameters ---------- label : str Label assigned to data set, used subsequently for retrieval. """ self.EH_cache[label] = [self.get_EH_slices(nf=nf) for nf in range(len(self.freqs))]
[docs] def get_eigenmode_slices(self, mode, nf=0): """Like get_EH_slices, but for eigenmode fields. Return a 1D array (list) of arrays of field amplitudes for all tangential E,H components at a single frequency---just like get_EH_slices()---except that the sliced E and H fields are the fields of eigenmode #mode. Parameters ---------- mode : int Eigenmode index. nf : int, optional Frequency index, by default 0 Returns ---------- list of np.arrays (same format as returned by get_EH_slices) """ # look for data in cache tag='M{}.F{}'.format(mode,nf) if self.eigencache and tag in self.eigencache: return self.eigencache[tag] # data not in cache; compute eigenmode and populate slice arrays freq, dir, k0 = self.freqs[nf], self.normal, mp.Vector3() vol = mp.Volume(V3(self.region.center),V3(self.region.size)) eigenmode = self.sim.get_eigenmode(freq, dir, vol, mode, k0) def get_eigenslice(eigenmode, grid, c): return np.reshape( [eigenmode.amplitude(p,c) for p in grid.points], grid.shape ) eh_slices=[get_eigenslice(eigenmode,self.grid,c) for c in self.components] # store in cache before returning if self.eigencache is not None: self.eigencache[tag]=eh_slices return eh_slices
def __call__(self, qcode, mode=0, nf=0): """Compute and return the value of an objective quantity. Computes an objective quantity, i.e. an eigenmode coefficient or a scattered or total power. Parameters ---------- qcode : [str] Objective quantity code. mode : int, optional Eigenmode index, by default 0 nf : int, optional Frequency index, by default 0 Returns ------- float64 or complex128 value of objective quantity """ w = np.reshape(self.grid.weights,self.grid.shape) EH = self.get_EH_slices(nf=nf) quantity=qcode.upper() if qcode.islower(): self.subtract_incident_fields(EH,nf) if quantity=='S': return np.real(np.sum(w*( np.conj(EH[0])*EH[3] - np.conj(EH[1])*EH[2]) )) if quantity.upper() in 'PFMB': eh = self.get_eigenmode_slices(mode, nf) # EHList of eigenmode fields eH = np.sum( w*(np.conj(eh[0])*EH[3] - np.conj(eh[1])*EH[2]) ) hE = np.sum( w*(np.conj(eh[3])*EH[0] - np.conj(eh[2])*EH[1]) ) sign=1.0 if qcode in ['P','F'] else -1.0 return (eH + sign*hE)/4.0 if quantity in ['UE', 'UH', 'UM', 'UEH', 'UEM', 'UT']: q=0.0 if quantity in ['UE', 'UEH', 'UEM', 'UT']: eps = self.sim.get_dft_array(self.dft_obj, mp.Dielectric, nf) E2 = np.sum( [np.conj(EH[nc])*EH[nc] for nc,c in enumerate(self.components) if c in E_CPTS], axis=0 ) q += 0.5*np.sum(w*eps*E2) if quantity in ['UH', 'UM', 'UEH', 'UEM', 'UT']: mu = self.sim.get_dft_array(self.dft_obj, mp.Permeability, nf) H2 = np.sum( [np.conj(EH[nc])*EH[nc] for nc,c in enumerate(self.components) if c in H_CPTS], axis=0 ) q += 0.5*np.sum(w*mu*H2) return q else: # TODO: support other types of objectives quantities? ValueError('DFTCell {}: unsupported quantity type {}'.format(self.name,qcode))
###################################################################### ###################################################################### ###################################################################### def rescale_sources(sources): """Scale the overall amplitude of a spatial source distribution to compensate for the frequency dependence of its temporal envelope. In a MEEP calculation driven by sources with a pulsed temporal envelope T(t), the amplitudes of all frequency-f DFT fields will be proportional to T^tilde (f), the Fourier transform of the envelope. Here we divide the overall amplitude of the source by T^tilde(f_c) (where f_c = center frequency), which exactly cancels the extra scale factor for DFT fields at f_c. Args: sources: list of mp.Sources Returns: none (the rescaling is done in-place) """ for s in sources: envelope, fcen = s.src, s.src.frequency if callable(getattr(envelope, "fourier_transform", None)): s.amplitude /= envelope.fourier_transform(fcen)