gramex.services

Configure Gramex services.

Each key in gramex.yaml calls the corresponding function in this file. For example,

log:
    version: 1

… calls gramex.service.log() as log({"version": 1}). If no such function exists, a warning is raised.

version(conf)

Check if config version is supported. Currently, only 1.0 is supported

Source code in gramex\services\__init__.py
69
70
71
72
def version(conf: dict) -> None:
    '''Check if config version is supported. Currently, only 1.0 is supported'''
    if conf != 1.0:
        raise NotImplementedError(f'version: {conf} is not supported. Only 1.0')

log(conf)

Set up logging using Python’s standard logging.config.dictConfig()

Source code in gramex\services\__init__.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def log(conf: dict) -> None:
    '''Set up logging using Python's standard logging.config.dictConfig()'''
    # Create directories for directories mentioned by handlers if logs are used
    active_handlers = set(conf.get('root', {}).get('handlers', []))
    for logger in conf.get('loggers', {}).values():
        active_handlers |= set(logger.get('handlers', []))
    for handler, handler_conf in conf.get('handlers', {}).items():
        if handler in active_handlers:
            filename = handler_conf.get('filename', None)
            if filename is not None:
                folder = os.path.dirname(os.path.abspath(handler_conf.filename))
                if not os.path.exists(folder):
                    try:
                        os.makedirs(folder)
                    except OSError:
                        app_log.exception(f'log: {handler}: cannot create folder {folder}')
    try:
        logging.config.dictConfig(conf)
    except (ValueError, TypeError, AttributeError, ImportError):
        app_log.exception('Error in log: configuration')

app(conf)

Set up tornado.web.Application() – only if the ioloop hasn’t started

Source code in gramex\services\__init__.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def app(conf: dict) -> None:
    '''Set up tornado.web.Application() -- only if the ioloop hasn't started'''
    ioloop = info.main_ioloop or tornado.ioloop.IOLoop.current()
    if ioloop_running(ioloop):
        app_log.warning('Ignoring app config change when running')
    else:
        info.app = GramexApp(**conf.settings)
        try:
            info.app.listen(**conf.listen)
        except socket.error as e:
            port_used_codes = {'windows': 10048, 'linux': 98}
            if e.errno not in port_used_codes.values():
                raise
            logging.error(f'Port {conf.listen.port} is busy. Use --listen.port=<new-port>')
            sys.exit(1)

        def callback():
            '''Called after all services are started. Opens browser if required'''
            if ioloop_running(ioloop):
                return

            # If enterprise version is installed, user must accept license
            try:
                import gramexenterprise  # noqa

                gramex.license.accept()
            except ImportError:
                pass

            app_log.info(f'Listening on port {conf.listen.port}')
            app_log_extra['port'] = conf.listen.port
            msg = f'Gramex {__version__} listening on http://127.0.0.1:{conf.listen.port}/. '

            # browser: True opens the application home page on localhost.
            # browser: url opens the application to a specific URL
            url = f'http://127.0.0.1:{conf.listen.port}/'
            if conf.browser:
                if isinstance(conf.browser, str):
                    url = urljoin(url, conf.browser)
                try:
                    browser = webbrowser.get()
                    msg += f'Opening {url} in {browser.__class__.__name__} browser'
                    browser.open(url)
                except webbrowser.Error:
                    msg += 'Unable to open browser'
            else:
                msg += '<Ctrl-B> opens browser, <Ctrl-D> starts debugger'

            console(msg)

            # Ensure that we call shutdown() on Ctrl-C.
            # On Windows, Tornado does not exit on Ctrl-C. This also fixes that.
            # When Ctrl-C is pressed, signal_handler() sets _exit to [True].
            # check_exit() periodically watches and calls shutdown().
            # But signal handlers can only be set in the main thread.
            # So ignore if we're not in the main thread (e.g. for nosetests, Windows service)
            #
            # Note: The PeriodicCallback takes up a small amount of CPU time.
            # Note: getch() doesn't handle keyboard buffer queue.
            # Note: This is no guarantee that shutdown() will be called.
            if isinstance(threading.current_thread(), threading._MainThread):
                exit = [False]

                def check_exit():
                    if exit[0] is True:
                        shutdown()
                    # If Ctrl-D is pressed, run the Python debugger
                    char = debug.getch()
                    if char == b'\x04':
                        breakpoint()  # noqa: T100
                    # If Ctrl-B is pressed, start the browser
                    if char == b'\x02':
                        browser = webbrowser.get()
                        browser.open(url)

                def signal_handler(signum, frame):
                    exit[0] = True

                try:
                    signal.signal(signal.SIGINT, signal_handler)
                except ValueError:
                    # When running as a Windows Service (winservice.py), python
                    # itself is on a thread, I think. So ignore the
                    # ValueError: signal only works in main thread.
                    pass
                else:
                    tornado.ioloop.PeriodicCallback(check_exit, callback_time=500).start()

            info.main_ioloop = ioloop
            ioloop.start()

        return callback

schedule(conf)

Set up the Gramex scheduler

Source code in gramex\services\__init__.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def schedule(conf: dict) -> None:
    '''Set up the Gramex scheduler'''
    # Create tasks running on ioloop for the given schedule, store it in info.schedule
    from . import scheduler

    _stop_all_tasks(info.schedule)
    for name, sched in conf.items():
        _key = cache_key('schedule', sched)
        if _key in _cache:
            task = info.schedule[name] = _cache[_key]
            task.call_later()
            continue
        try:
            app_log.info(f'Initialising schedule:{name}')
            _cache[_key] = scheduler.Task(name, sched, info.threadpool, ioloop=info.main_ioloop)
            info.schedule[name] = _cache[_key]
        except Exception as e:
            app_log.exception(e)

alert(conf)

Sets up the alert service

Source code in gramex\services\__init__.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def alert(conf: dict) -> None:
    '''
    Sets up the alert service
    '''
    from . import scheduler

    _stop_all_tasks(info.alert)
    schedule_keys = ['minutes', 'hours', 'dates', 'months', 'weekdays', 'years', 'startup', 'utc']

    for name, alert in conf.items():
        _key = cache_key('alert', alert)
        if _key in _cache:
            task = info.alert[name] = _cache[_key]
            task.call_later()
            continue
        app_log.info(f'Initialising alert: {name}')
        schedule = {key: alert[key] for key in schedule_keys if key in alert}
        if 'thread' in alert:
            schedule['thread'] = alert['thread']
        schedule['function'] = create_alert(name, alert)
        if schedule['function'] is not None:
            try:
                _cache[_key] = scheduler.Task(
                    name, schedule, info.threadpool, ioloop=info.main_ioloop
                )
                info.alert[name] = _cache[_key]
            except Exception:
                app_log.exception(f'Failed to initialize alert: {name}')

threadpool(conf)

Set up a global threadpool executor

Source code in gramex\services\__init__.py
241
242
243
244
245
246
247
248
def threadpool(conf: dict) -> None:
    '''Set up a global threadpool executor'''
    # By default, use a single worker. If a different value is specified, use it
    workers = 1
    if conf and hasattr(conf, 'get'):
        workers = conf.get('workers', workers)
    info.threadpool = concurrent.futures.ThreadPoolExecutor(workers)
    atexit.register(info.threadpool.shutdown)

url(conf)

Set up the tornado web app URL handlers

Source code in gramex\services\__init__.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def url(conf: dict) -> None:
    '''Set up the tornado web app URL handlers'''
    info.url = AttrDict()
    # Sort the handlers in descending order of priority
    specs = sorted(conf.items(), key=_sort_url_patterns, reverse=True)
    for name, spec in specs:
        _key = cache_key('url', spec)
        if _key in _cache:
            info.url[name] = _cache[_key]
            continue
        if 'pattern' not in spec:
            app_log.error(f'url:{name}: no pattern: specified')
            continue
        # service: is an alias for handler: and has higher priority
        if 'service' in spec:
            spec.handler = spec.service
        if 'handler' not in spec:
            app_log.error(f'url:{name}: no service: or handler: specified')
            continue
        # handler: gramex.handlers.FunctionHandler is valid.
        # But on Windows, PyDoc locates it as the module gramex.handlers.functionhandler.py.
        # So explicitly strip out `gramex.handlers.` prefix if provided.
        if spec.handler.startswith('gramex.handlers.'):
            spec.handler = spec.handler.replace('gramex.handlers.', '')
        app_log.debug(f'url:{name} ({spec.handler}) {spec.get("priority", "")}')
        handler_class = locate(str(spec.handler), modules=['gramex.handlers'])
        if handler_class is None:
            app_log.error(f'url:{name}: ignoring missing handler {spec.handler}')
            continue

        # Create a subclass of the handler with additional attributes.
        class_vars = {'name': name, 'conf': spec}
        # If there's a cache section, get the cache method for use by BaseHandler
        if 'cache' in spec:
            class_vars['cache'] = _cache_generator(spec['cache'], name=name)
        else:
            class_vars['cache'] = None
        handler = type(spec.handler, (handler_class,), class_vars)

        # Ensure that there's a kwargs: dict in the spec
        spec.setdefault('kwargs', AttrDict())
        if not isinstance(spec.kwargs, dict):
            app_log.error(f'url:{name} kwargs must be a dict, not {spec.kwargs!r}')
            spec.kwargs = AttrDict()
        # If there's a setup method, call it to initialize the class
        if hasattr(handler_class, 'setup'):
            try:
                handler.setup_default_kwargs()  # Updates spec.kwargs with base handlers
                handler.setup(**spec.kwargs)
            except Exception:
                app_log.exception(f'url:{name} ({spec.handler}) invalid configuration')
                # Since we can't set up the handler, all requests must report the error instead
                class_vars['exc_info'] = sys.exc_info()
                error_handler = locate('SetupFailedHandler', modules=['gramex.handlers'])
                handler = type(spec.handler, (error_handler,), class_vars)
                spec.kwargs = {}
                handler.setup(**spec.kwargs)

        try:
            handler_entry = tornado.web.URLSpec(
                name=name,
                pattern=_url_normalize(spec.pattern),
                handler=handler,
                kwargs=spec.kwargs,
            )
        except re.error:
            app_log.error(f'url:{name}: pattern: {spec.pattern!r} is invalid')
            continue
        except Exception:
            app_log.exception(f'url:{name}: setup failed')
            continue
        info.url[name] = _cache[_key] = handler_entry

    info.app.clear_handlers()
    info.app.add_handlers('.*$', info.url.values())

mime(conf)

Set up MIME types

Source code in gramex\services\__init__.py
328
329
330
331
def mime(conf: dict) -> None:
    '''Set up MIME types'''
    for ext, type in conf.items():
        mimetypes.add_type(type, ext, strict=True)

watch(conf)

Set up file watchers

Source code in gramex\services\__init__.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def watch(conf: dict) -> None:
    '''Set up file watchers'''
    from . import watcher

    events = {'on_modified', 'on_created', 'on_deleted', 'on_moved', 'on_any_event'}
    for name, config in conf.items():
        _key = cache_key('watch', config)
        if _key in _cache:
            watcher.watch(name, **_cache[_key])
            continue
        if 'paths' not in config:
            app_log.error(f'watch:{name} has no "paths"')
            continue
        if not set(config.keys()) & events:
            app_log.error(f'watch:{name} has no events (on_modified, ...)')
            continue
        if not isinstance(config['paths'], (list, set, tuple)):
            config['paths'] = [config['paths']]
        for event in events:
            if event in config and not callable(config[event]):
                config[event] = locate(config[event], modules=['gramex.transforms'])
                if not callable(config[event]):
                    app_log.error(f'watch:{name}.{event} is not callable')
                    config[event] = lambda event: None
        _cache[_key] = config
        watcher.watch(name, **_cache[_key])

cache(conf)

Set up caches

Source code in gramex\services\__init__.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def cache(conf: dict) -> None:
    '''Set up caches'''
    for name, config in conf.items():
        cache_type = config['type']
        if cache_type not in _cache_defaults:
            app_log.warning(f'cache:{name} has unknown type {config.type}')
            continue
        config = merge(dict(config), _cache_defaults[cache_type], mode='setdefault')
        if cache_type == 'memory':
            info.cache[name] = urlcache.MemoryCache(
                maxsize=config['size'], getsizeof=gramex.cache.sizeof
            )
        elif cache_type == 'disk':
            path = config.get('path', '.cache-' + name)
            info.cache[name] = urlcache.DiskCache(
                path, size_limit=config['size'], eviction_policy='least-recently-stored'
            )
            atexit.register(info.cache[name].close)
        elif cache_type == 'redis':
            path = config['path'] if 'path' in config else None
            try:
                info.cache[name] = urlcache.RedisCache(path=path, maxsize=config['size'])
            except Exception:
                app_log.exception(f'cache:{name} cannot connect to redis')
        # if default: true, make this the default cache for gramex.cache.{open,query}
        if config.get('default'):
            for key in ['_OPEN_CACHE', '_QUERY_CACHE']:
                old_cache = getattr(gramex.cache, key, {})
                # Migrate cache and clear it
                for k in old_cache.keys():
                    try:
                        info.cache[name].set(k, old_cache.get(k, None))
                    except Exception:
                        # Don't f-string the `k` into the message. It might contain a %s
                        app_log.exception(f"cache:{name} can't migrate %r", k)
                old_cache.clear()
                setattr(gramex.cache, key, info.cache[name])

eventlog(conf)

Set up the application event logger

Source code in gramex\services\__init__.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def eventlog(conf: dict) -> None:
    '''Set up the application event logger'''
    if not conf.path:
        return

    import time
    import sqlite3

    folder = os.path.dirname(os.path.abspath(conf.path))
    if not os.path.exists(folder):
        os.makedirs(folder)

    def query(q, *args, **kwargs):
        conn = sqlite3.connect(conf.path, check_same_thread=False)
        conn.row_factory = sqlite3.Row
        result = list(conn.execute(q, *args, **kwargs))
        conn.commit()
        conn.close()
        return result

    def add(event_name, data):
        '''Write a message into the application event log'''
        data = json.dumps(data, ensure_ascii=True, separators=(',', ':'))
        query('INSERT INTO events VALUES (?, ?, ?)', [time.time(), event_name, data])

    def shutdown():
        add('shutdown', {'version': __version__, 'pid': os.getpid()})
        # Don't conn.close() here. gramex.gramex_update() runs in a thread. If we start and
        # stop gramex quickly, allow gramex_update to add too this entry

    info.eventlog.query = query
    info.eventlog.add = add

    query('CREATE TABLE IF NOT EXISTS events (time REAL, event TEXT, data TEXT)')
    add(
        'startup',
        {'version': __version__, 'pid': os.getpid(), 'args': sys.argv, 'cwd': os.getcwd()},
    )
    atexit.register(shutdown)

email(conf)

Set up email service

Source code in gramex\services\__init__.py
455
456
457
458
459
460
461
462
def email(conf: dict) -> None:
    '''Set up email service'''
    for name, config in conf.items():
        _key = cache_key('email', config)
        if _key in _cache:
            info.email[name] = _cache[_key]
            continue
        info.email[name] = _cache[_key] = SMTPMailer(**config)

handlers(conf)

Set up handlers service.

This holds default configurations for handlers configured by gramex.services.url, e.g. BaseHandler errors, FileHandler ignores, etc.

Source code in gramex\services\__init__.py
488
489
490
491
492
493
def handlers(conf: dict) -> None:
    '''Set up handlers service.

    This holds default configurations for handlers configured by [gramex.services.url][],
    e.g. BaseHandler errors, FileHandler ignores, etc.'''
    pass

test(conf)

Set up test service

Source code in gramex\services\__init__.py
500
501
502
503
504
def test(conf: dict) -> None:
    '''Set up test service'''
    # Remove auth: section when running gramex.
    # If there are passwords here, they will not be loaded in memory
    conf.pop('auth', None)

gramexlog(conf)

Set up gramexlog service

Source code in gramex\services\__init__.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
def gramexlog(conf: dict) -> None:
    '''Set up gramexlog service'''
    from gramex.transforms import build_log_info

    try:
        from elasticsearch7 import Elasticsearch, helpers
    except ImportError:
        app_log.error('gramexlog: elasticsearch7 missing. pip install elasticsearch7')
        return

    # We call push() every 'flush' seconds on the main IOLoop. Defaults to every 5 seconds
    flush = conf.pop('flush', 5)
    ioloop = info.main_ioloop or tornado.ioloop.IOLoop.current()
    # Set the defaultapp to the first config key under gramexlog:
    if conf:
        info.gramexlog.defaultapp = next(iter(conf.keys()))
    for app, app_conf in conf.items():
        app_config = info.gramexlog.apps[app] = AttrDict()
        app_config.queue = []
        keys = app_conf.pop('keys', [])
        # If user specifies keys: [port, args.x, ...], these are captured as additional keys.
        # The keys use same spec as Gramex logging.
        app_config.extra_keys = build_log_info(keys)
        # Ensure all gramexlog keys are popped from app_conf, leaving only Elasticsearch keys
        app_config.conn = Elasticsearch(**app_conf)

    def push():
        for app, app_config in info.gramexlog.apps.items():
            for item in app_config.queue:
                item['_index'] = app_config.get('index', app)
            try:
                helpers.bulk(app_config.conn, app_config.queue)
                app_config.queue.clear()
            except Exception:
                # TODO: If the connection broke, re-create it
                # This generic exception should be caught for thread to continue its execution
                app_log.exception(f'gramexlog: push to {app} failed')
        if 'handle' in info.gramexlog:
            ioloop.remove_timeout(info.gramexlog.handle)
        # Call again after flush seconds
        info.gramexlog.handle = ioloop.call_later(flush, push)

    info.gramexlog.handle = ioloop.call_later(flush, push)
    info.gramexlog.push = push

storelocations(conf)

Initialize the store locations.

gramex.service.storelocation[location] holds a database storage location that works with gramex.data.alter. It MUST have the following keys:

  • url: SQLAlchemy URL
  • table: table name
  • columns: column names, with values are SQL types, or dicts
Source code in gramex\services\__init__.py
553
554
555
556
557
558
559
560
561
562
563
564
565
def storelocations(conf: dict) -> None:
    '''Initialize the store locations.

    `gramex.service.storelocation[location]` holds a database storage location that works with
    [`gramex.data.alter`][gramex.data.alter]. It MUST have the following keys:

    - `url`: SQLAlchemy URL
    - `table`: table name
    - `columns`: column names, with values are SQL types, or dicts
    '''
    for key, subconf in conf.items():
        info.storelocations[key] = subconf
        gramex.data.alter(**subconf)

get_mailer(config, name='')

Return the email service config and corresponding mailer for a given config.

Source code in gramex\services\__init__.py
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
def get_mailer(config, name=''):
    '''Return the email service config and corresponding mailer for a given config.'''
    if config.get('service', None) is None:
        if len(info.email) > 0:
            service = config['service'] = list(info.email.keys())[0]
            app_log.warning(f'{name}: using first email service: {service}')
        else:
            app_log.error(f'{name}: define an email: service to use')
            return None, None
    service = config['service']
    mailer = info.email.get(service, None)
    if mailer is None:
        app_log.error(f'{name}: undefined email service: {service}')
        return None, None
    return service, mailer

create_mail(data, config, name)

Return kwargs that can be passed to a mailer.mail

Source code in gramex\services\__init__.py
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
def create_mail(data, config, name):
    '''Return kwargs that can be passed to a mailer.mail'''
    mail = {}
    for key in ['bodyfile', 'htmlfile', 'markdownfile']:
        target = key.replace('file', '')
        if key in config and target not in config:
            path = _tmpl(config[key]).generate(**data).decode('utf-8')
            tmpl = gramex.cache.open(path, 'template')
            mail[target] = tmpl.generate(**data).decode('utf-8')
    for key in _addr_fields + ['subject', 'body', 'html', 'markdown']:
        if key not in config:
            continue
        if isinstance(config[key], list):
            mail[key] = [_tmpl(v).generate(**data).decode('utf-8') for v in config[key]]
        else:
            mail[key] = _tmpl(config[key]).generate(**data).decode('utf-8')
    headers = {}
    # user: {id: ...} creates an X-Gramex-User header to mimic the user
    if 'user' in config:
        user = deepcopy(config['user'])
        for key, val, node in walk(user):
            node[key] = _tmpl(val).generate(**data).decode('utf-8')
        user = json.dumps(user, ensure_ascii=True, separators=(',', ':'))
        headers['X-Gramex-User'] = tornado.web.create_signed_value(
            info.app.settings['cookie_secret'], 'user', user
        )
    if 'markdown' in mail:
        mail['html'] = _markdown_convert(mail.pop('markdown'))
    if 'images' in config:
        mail['images'] = {}
        for cid, val in config['images'].items():
            urlpath = _tmpl(val).generate(**data).decode('utf-8')
            urldata = urlfetch(urlpath, info=True, headers=headers)
            if urldata['content_type'].startswith('image/'):
                mail['images'][cid] = urldata['name']
            else:
                with io.open(urldata['name'], 'rb') as temp_file:
                    bytestoread = 80
                    first_line = temp_file.read(bytestoread)
                # TODO: let admin know that the image was not processed
                app_log.error(
                    f'{name}: {cid}: {urldata["r"].status_code} '
                    f'({urldata["content_type"]}) not an image: {urlpath}\n'
                    f'{first_line!r}'
                )
    if 'attachments' in config:
        mail['attachments'] = [
            urlfetch(_tmpl(v).generate(**data).decode('utf-8'), headers=headers)
            for v in config['attachments']
        ]
    return mail

create_alert(name, alert)

Generate the function to be run by alert() using the alert configuration

Source code in gramex\services\__init__.py
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
def create_alert(name, alert):
    '''Generate the function to be run by alert() using the alert configuration'''
    # Configure email service
    service, mailer = get_mailer(alert, name=f'alert: {name}')

    # - Warn if to, cc, bcc exists and is not a string or list of strings. Ignore incorrect
    #    - if to: [1, 'user@example.org'], then
    #    - log a warning about the 1. Drop the 1. to: becomes ['user@example.org']

    # Error if to, cc, bcc are all missing, return None
    if not any(key in alert for key in ['to', 'cc', 'bcc']):
        app_log.error(f'alert: {name}: missing to/cc/bcc')
        return
    # Ensure that config has the right type (str, dict, list)
    contentfields = ['body', 'html', 'bodyfile', 'htmlfile', 'markdown', 'markdownfile']
    for key in ['subject'] + _addr_fields + contentfields:
        if not isinstance(alert.get(key, ''), (str, list)):
            app_log.error(f'alert: {name}.{key}: {alert[key]!r} must be a list or str')
            return
    if not isinstance(alert.get('images', {}), dict):
        app_log.error(f'alert: {name}.images: {alert["images"]!r} is not a dict')
        return
    if not isinstance(alert.get('attachments', []), list):
        app_log.error(f'alert: {name}.attachments: {alert["attachments"]!r} is not a list')
        return

    # Warn if subject is missing
    if 'subject' not in alert:
        app_log.warning(f'alert: {name}: missing subject')

    # Warn if body, html, bodyfile, htmlfile keys are missing
    if not any(key in alert for key in contentfields):
        app_log.warning(f'alert: {name}: missing body/html/bodyfile/htmlfile/...')

    # Pre-compile data.
    #   - `data: {key: [...]}` -- loads data in-place
    #   - `data: {key: {url: file}}` -- loads from a file
    #   - `data: {key: {url: sqlalchemy-url, table: table}}` -- loads from a database
    #   - `data: file` -- same as `data: {data: {url: file}}`
    #   - `data: {key: file}` -- same as `data: {key: {url: file}}`
    #   - `data: [...]` -- same as `data: {data: [...]}`
    datasets = {}
    if 'data' in alert:
        if isinstance(alert['data'], str):
            datasets = {'data': {'url': alert['data']}}
        elif isinstance(alert['data'], list):
            datasets = {'data': alert['data']}
        elif isinstance(alert['data'], dict):
            for key, dataset in alert['data'].items():
                if isinstance(dataset, str):
                    datasets[key] = {'url': dataset}
                elif isinstance(dataset, list) or 'url' in dataset:
                    datasets[key] = dataset
                else:
                    app_log.error(f'alert: {name}.data: {key} is missing url:')
        else:
            app_log.error(f'alert: {name}.data: must be data file or dict. Not {alert["data"]!r}')

    if 'each' in alert and alert['each'] not in datasets:
        app_log.error(f'alert: {name}.each: {alert["each"]} is not in data:')
        return

    vars = {key: None for key in datasets}
    vars.update({'config': None, 'args': None})
    condition = build_transform(
        {'function': alert.get('condition', 'True')},
        filename=f'alert: {name}',
        vars=vars,
        iter=False,
    )

    alert_logger = logging.getLogger('gramex.alert')

    def load_datasets(data, each):
        '''
        Modify data by load datasets and filter by condition.
        Modify each to apply the each: argument, else return (None, None)
        '''
        for key, val in datasets.items():
            # Allow raw data in lists as-is. Treat dicts as {url: ...}
            data[key] = val if isinstance(val, list) else gramex.data.filter(**val)
        result = condition(**data)
        # Avoiding isinstance(result, pd.DataFrame) to avoid importing pandas
        if type(result).__name__ == 'DataFrame':
            data['data'] = result
        elif isinstance(result, dict):
            data.update(result)
        elif not result:
            app_log.debug(f'alert: {name} stopped. condition = {result}')
            return
        if 'each' in alert:
            each_data = data[alert['each']]
            if isinstance(each_data, dict):
                each += list(each_data.items())
            elif isinstance(each_data, list):
                each += list(enumerate(each_data))
            elif hasattr(each_data, 'iterrows'):
                each += list(each_data.iterrows())
            else:
                raise ValueError(
                    f'alert: {name}: each: data.{alert["each"]} must be '
                    + 'dict/list/DF, not {type(each_data)}'
                )
        else:
            each.append((0, None))

    def run_alert(callback=None, args=None):
        '''
        Runs the configured alert. If a callback is specified, calls the
        callback with all email arguments. Else sends the email.
        If args= is specified, add it as data['args'].
        '''
        app_log.info(f'alert: {name} running')
        data, each, fail = {'config': alert, 'args': {} if args is None else args}, [], []
        try:
            load_datasets(data, each)
        except Exception as e:
            app_log.exception(f'alert: {name} data processing failed')
            fail.append({'error': e})

        retval = []
        for index, row in each:
            data['index'], data['row'], data['config'] = index, row, alert
            try:
                retval.append(
                    AttrDict(index=index, row=row, mail=create_mail(data, alert, f'alert: {name}'))
                )
            except Exception as e:
                app_log.exception(f'alert: {name}[{index}] templating (row={row!r})')
                fail.append({'index': index, 'row': row, 'error': e})

        callback = mailer.mail if not callable(callback) else callback
        done = []
        for v in retval:
            try:
                callback(**v.mail)
            except Exception as e:
                fail.append({'index': v.index, 'row': v.row, 'mail': v.mail, 'error': e})
                app_log.exception(f'alert: {name}[{v.index}] delivery (row={v.row!r})')
            else:
                done.append(v)
                event = {
                    'alert': name,
                    'service': service,
                    'from': mailer.email or '',
                    'to': '',
                    'cc': '',
                    'bcc': '',
                    'subject': '',
                    'datetime': datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ"),
                }
                event.update({k: v for k, v in v.mail.items() if k in event})
                event['attachments'] = ', '.join(v.mail.get('attachments', []))
                alert_logger.info(event)

        # Run notifications
        args = {'done': done, 'fail': fail}
        for notification_name in alert.get('notify', []):
            notify = info.alert.get(notification_name)
            if notify is not None:
                notify.run(callback=callback, args=args)
            else:
                app_log.error(f'alert: {name}.notify: alert {notification_name} not defined')
        return args

    return run_alert