Source code for zircon.injectors.base

"""

"""

from zircon.transformers.common import Unpickler
from zircon.subscribers.zeromq import ZMQSubscriber
from zircon.datastores.influx import InfluxDatastore


[docs]class Injector(): """ An Injector listens for data from a Reporter, feeds it through a row of Transformers, and inserts the result into a Datastore. When creating an Injector, you supply instances of a Subscriber, one or more Transformers, and a Datastore. If not specified, an unpickling Transformer and the default Subscriber and Datastore are used. **Usage**:: injector = Injector( subscriber=MySubscriber(), transformers=[MyDecompressor(), MyFormatter(), ...], datastore=MyDatastore() ) An Injector can be run as its own process:: injector.run() Or stepped through by an external engine:: injector.open() while not done: injector.step() """ def __init__(self, subscriber=None, transformers=None, datastore=None): if subscriber: self.subscriber = subscriber else: self.subscriber = ZMQSubscriber() if transformers: self.transformers = transformers else: self.transformers = [Unpickler()] if datastore: self.datastore = datastore else: self.datastore = InfluxDatastore() for i in range(len(self.transformers) - 1): self.transformers[i].set_callback(self.transformers[i+1].push) self.transformers[-1].set_callback(self.datastore.insert)
[docs] def open(self): """ Initialize the Subscriber. """ success = self.subscriber.open() if not success: return False return True
[docs] def step(self): """ Receive data and feed it into the first Transformer. """ msg = self.subscriber.receive() if msg is not None: self.transformers[0].push(msg)
[docs] def run(self): """ Initialize components and start listening. """ success = self.open() if not success: print('Failed to initialize!') return while True: self.step()