Source code for meep_adjoint.timestepper

"""Implementation of TimeStepper class."""

import os
import sys
import psutil
import time
import numpy as np
import meep as mp

import warnings
from datetime import datetime as dt2

from . import (ObjectiveFunction, Basis, v3, V3, E_CPTS, log, update_dashboard)
from . import get_adjoint_option as adj_opt

from .console_manager import CODEWORD as CONSOLE_CODEWORD
from .console_manager import termsty

# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#########################################################
# step function to update GUI dashboard progress bar
#########################################################
mt0, wt0, wtdb, wtcpu = 0, 0, 0, 0
update_interval, cpu_interval, proc = 1, 2, psutil.Process(os.getpid())
def dashboard_sf(sim):
    """Provisional implementation of step function to update GUI dashboard."""
    global mt0, wt0, wtdb, wtcpu, update_interval, cpu_interval, proc
    mt, wt = sim.round_time(), time.time()
    if mt>mt0 and (wt-wtdb)>=update_interval:
        updates = ['progress {}'.format(int(mt)),
                   'ms_per_timestep {}'.format(1000.0*(wt-wt0)/(mt-mt0))]
        if (wt-wtcpu) > cpu_interval:
            wtcpu = wt
            updates += ['cpu_usage {:.1f}'.format(proc.cpu_percent())]
        update_dashboard(updates)
        mt0, wt0, wtdb = mt, wt, wt
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!


[docs]class TimeStepper(object): """ Abstraction of low-level FDTD engine. TimeStepper is a class that knows how to invoke an FDTD solver to execute a time-domain calculation to obtain frequency-domain (DFT) field components in objective regions, from which may be computed (a) the values of objective quantities and objective functions, or (b) permittivity derivatives of the objective function, from which may be computed the objective-function gradient. This class lies between the top-level `OptimizationProblem` session-manager class and lower-level classes like `ObjectiveFunction`. It exports a single method (``__call__``) which prepares and executes one complete FDTD timestepping run to compute frequency-domain fields, returning numerical quantities of interest computed from these fields. ``__call__`` accepts one `str`-valued argument `job`, for which the recognized values are `forward` or `adjoint`. For `job`==`forward`, the excitation sources for the FDTD problem are the user-defined sources passed to the `OptimizationProblem` constructor and the result of the calculation is the objective-function value (plus the values of any additional requested objective quantities). For `job`==`adjoint`, the excitation sources are automatically determined internally and the result of the calculation is the objective-function gradient. `TimeStepper` automatically determines when to terminate a timestepping run by monitoring the numerical convergence of the output quantities. The details of this process may be customized using configuration options. `TimeStepper` does not itself know how to evaluate its output quantities, but rather outsources these calculations to public methods of `ObjectiveFunction` and other low-level classes. Methods ------- __call__(job): Populate the FDTD grid with an appropriate source distribution, initialize DFT cells for tabulating frequency-domain fields, then execute FDTD timestepping until the output quantities converges and return those quantities. """ ######################################################### ######################################################### ######################################################### def __init__(self, obj_func, dft_cells, basis, sim, fwd_sources): """ Constructor. Parameters ---------- obj_func : ObjectiveFunction The function whose value or gradient we compute (copied from OptimizationProblem) dft_cells : list of `DFTCell`" List of `DFTCell` structures, assumed to be ordered with objective cells *first* and the design cell *last* (so any `extra` cells are in the interior of the list). basis : Basis Function space (copied from OptimizationProblem). Logically this is not a necessary input for this calculation---it is used only in a post-processing step to compute the objective-function gradient by projecting df/dEps onto the function space. It might make more sense to define the output of TimeStepper to be the raw df/dEps matrix, with the projection step done at a higher level, in which case this parameter could be omitted. But then the convergence of the adjoint timestepping run would have to consider the full raw df/dEps matrix instead of the lower-dimensional gradient vector. sim : `meep.Simulation` Simulation object, copied from OptimizationProblem. fwd_sources : list of `meep.Source` User-specified sources for the forward calculation. """ self.obj_func = obj_func self.dft_cells = dft_cells self.design_cell = dft_cells[-1] self.basis = basis self.sim = sim self.fwd_sources = fwd_sources self.dfdEps = None self.state = 'reset' def __update__(self, job): """ Recompute output quantities using most recent values of frequency-domain fields. This is an internal helper method for run() that computes the objective function value (forward run) or gradient (adjoint run) using the latest values of the frequency-domain fields stored in the DFT cells. During timestepping it is called every check_interval units of MEEP time once the excitation sources have died down. Parameters ---------- job : str 'forward' or 'adjoint' Returns ------- ** If job==`forward`: ** fq: numpy array of length N+1 storing values of the objective function and the N objective quantities [F, Q0, Q1, ... QN] ** If job==`adjoint`: ** df: numpy array of length `basis.dim` storing components of the objective-function gradient """ if job=='forward': retvals = self.obj_func(self.dft_cells) log(' ** {:10s}={:.5f} ** '.format('t',self.sim.round_time())) for n,v in zip(['f'] + self.obj_func.qnames, retvals): log(' ** {:10s}={:+.5e} '.format(n,v)) else: # job=='adjoint' EH_fwd = self.design_cell.get_EH_slices(label='forward') EH_adj = self.design_cell.get_EH_slices() self.dfdEps = np.zeros(self.design_cell.grid.shape) for n in [ n for (n,c) in enumerate(self.design_cell.components) if c in E_CPTS]: self.dfdEps += np.real( EH_fwd[n]*EH_adj[n] ) retvals = self.basis.project(self.dfdEps, grid=self.design_cell.grid, differential=True) return retvals ######################################################### # main timestepper routine that keeps going until the # relevant output quantity has converged #########################################################
[docs] def run(self, job): """Execute a forward or adjoint FDTD timestepping run and return results. Parameters ---------- job: str 'forward' or 'adjoint' """ self.prepare(job) last_source_time = self.fwd_sources[0].src.swigobj.last_time() max_time = adj_opt('dft_timeout')*last_source_time check_interval = adj_opt('dft_interval')*last_source_time reltol = adj_opt('dft_reltol') # configure real-time animations of evolving time-domain fields # step_funcs = [] # clist = adjoint_adj_opt('animate_components') # if clist is not None: # ivl=adjoint_adj_opt('animate_interval') # ivl=0.5 # step_funcs = [ AFEClient(self.sim, clist, interval=ivl) ] # phase 1: timestepping without interruption until the sources are extinguished prefix = CONSOLE_CODEWORD if adj_opt('silence_meep') else '' log("Beginning {} timestepping run...".format(job)) update_dashboard(['run {}'.format(job)]) global mt0, wt0, wtdb, wtcpu, proc dummy = proc.cpu_percent() stage, mta, mtb = 'SOURCES', 0, last_source_time update_dashboard(['stage {}'.format(stage), 'progress range {} {}'.format(int(mta),int(mtb))]) log('stepping from {} to {}'.format(mta,mtb)) # sys.stdout.write(prefix + '**stepping from {} to {}\n'.format(mta,mtb)) sys.stdout.write(prefix + termsty(job,'1') + ' ' \ + termsty(stage,'2') + ' ' \ + termsty('{:5.1f} -> t -> {:5.1f}'.format(mta,mtb),'3')) mt0, wt0 = mta, time.time() wtdb, wtcpu, dt = wt0, wt0, (mtb-mta)/100.0 self.sim.run(mp.at_every(dt, dashboard_sf), until=mtb) vals = self.__update__(job) # now continue timestepping with intermittent convergence checks until # we converge or timeout stage, max_rel_delta = 0, 1.0e9 while max_rel_delta>reltol and self.sim.round_time() < max_time: #check_time = self.sim.round_time() + check_interval #self.sim.run(*step_funcs, until = min(check_time, max_time)) mta, mtb, stage = mtb, min(mtb + check_interval, max_time), stage+1 update_dashboard(['stage conv {}'.format(stage), 'progress range {} {}'.format(int(mta),int(mtb))]) log('stepping from {} to {}'.format(mta,mtb)) # sys.stdout.write(prefix + '**stepping from {} to {}\n'.format(mta,mtb)) sys.stdout.write(prefix + termsty(job,'1') \ + ' ' + termsty('CONV {:<2d}'.format(stage),'2') \ + ' ' + termsty('[{:5.1f} -> t -> {:5.1f}]'.format(mta,mtb),'3') \ + ' ' + termsty('delta {:.2e}'.format(max_rel_delta),'4')) mt0, wt0 = mta, time.time() wtdb, wtcpu, dt = wt0, wt0, (mtb-mta)/100.0 self.sim.run(mp.at_every(dt, dashboard_sf), until=mtb) last_vals, vals = vals, self.__update__(job) rel_delta = np.array( [rel_diff(v,lv) for v,lv in zip(vals,last_vals)] ) max_rel_delta = np.amax(rel_delta) log(' ** t={} MRD={} ** '.format(self.sim.round_time(), max_rel_delta)) # for forward runs we save the converged DFT fields for later use if job=='forward': [ cell.save_fields('forward') for cell in self.dft_cells ] update_dashboard(['current {:.2f}'.format(np.real(vals[0]))]) self.state = job + '.complete' return vals
############################################################## ############################################################## ##############################################################
[docs] def prepare(self, job='forward'): """ Prepare simulation for timestepping by adding sources and DFT cells. Parameters ---------- job (str): The type of timestepping run to prepare: 1. If job=='forward', prepare forward run to compute objective function value. 2. If job=='adjoint', prepare adjoint run to compute objective function gradient. 3. If job is the name of an objective quantity, prepare adjoint run to compute the gradient of that quantity. """ target_state = job + '.prepared' if self.state == target_state: return # get lists of sources and DFT cells to register if job=='forward': sources = self.fwd_sources cells = self.dft_cells # for forward runs we must tabulate DFT fields in all DFT cells... cmplx = adj_opt('complex_fields') elif job=='adjoint' or job in self.obj_func.qnames: sources = self.get_adjoint_sources ( qname = job ) cells = [self.design_cell] # ...for adjoint runs we only need DFT fields in the design region cmplx = True else: raise ValueError('unknown job {} in TimeStepper.prepare'.format(job)) # place sources, register cells, initialize fields reuse_simulation = False if adj_opt('reuse_simulation'): self.sim.reset_meep() self.sim.change_sources(sources) else: cell_size, geometry = self.sim.cell_size, self.sim.geometry self.sim = mp.Simulation(resolution=self.sim.resolution, boundary_layers=self.sim.boundary_layers, cell_size=self.sim.cell_size, geometry=self.sim.geometry, sources=sources) self.force_complex_fields = cmplx self.sim.init_sim() for cell in cells: cell.register(self.sim) self.state = target_state
############################################################## ############################################################## ##############################################################
[docs] def get_adjoint_sources(self, qname=None): """ Construct adjoint source distribution. Return a list of mp.Source structures describing the distribution of sources appropriate for adjoint-method evaluation of the objective-function gradient df/deps. The optional parameter qname may be set to the name of an objective quantity Q, in which case we instead compute dQ/deps. """ ###################################################################### # extract the temporal envelope of the forward sources and use it to # set the overall amplitude prefactor for the adjoint sources ###################################################################### envelope = self.fwd_sources[0].src freq = envelope.frequency omega = 2.0*np.pi*freq factor = 2.0j*omega if callable(getattr(envelope, "fourier_transform", None)): factor /= envelope.fourier_transform(freq) ###################################################################### # make a list of (qrule, qweight) pairs for all objective quantities # that contribute with nonzero weight to the derivative in question ###################################################################### if qname is None or qname=='adjoint': dfdq = self.obj_func.get_dfdq() iwlist = [ (i,w) for i,w in enumerate(dfdq) if w is not 0.0 ] elif qname in self.obj_func.qnames: iwlist = [ (self.obj_func.qnames.index(qname), 1.0) ] else: warnings.warn('unknown objective quantity {} in get_adjoint_sources (skipping)'.format(qname)) return [] rwlist = [ (self.obj_func.qrules[i], w) for (i,w) in iwlist if w!=0.0 ] ###################################################################### # loop over all contributing objective quantities ###################################################################### sources = [] for (qrule, qweight) in rwlist: code, mode, cell = qrule.code, qrule.mode, self.dft_cells[qrule.ncell] EH = cell.get_eigenmode_slices(mode) if mode>0 else cell.get_EH_slices('forward') shape = [ len(tics) for tics in [cell.grid.xtics, cell.grid.ytics, cell.grid.ztics] ] if code in 'PM': sign = 1.0 if code=='P' else -1.0 signs = [ +1.0, -1.0, +1.0*sign, -1.0*sign ] sources += [ mp.Source(envelope, cell.components[3-nc], V3(cell.region.center), V3(cell.region.size), amplitude=signs[nc]*factor*qweight, amp_data=np.reshape(np.conj(EH[nc]),shape) ) for nc in range(len(cell.components)) ] return sources
def rel_diff(a,b): """Return value in range [0,2] quantifying error relative to magnitude.""" diff, scale = np.abs(a-b), np.amax([np.abs(a),np.abs(b)]) return 2. if np.isinf(scale) else 0. if scale==0. else diff/scale