// Process the myProp value (e.g. validation, convert to another type, ...)
// Store myProp for later retrieval by process() method this.myProp = myProp; }
@Override publicvoidstart() { // Initialize the connection to the external client }
@Override publicvoidstop() { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. }
@Override public Status process()throws EventDeliveryException { Statusstatus=null;
try { // This try clause includes whatever Channel/Event operations you want to do
// Receive new data Evente= getSomeData();
// Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e);
status = Status.READY; } catch (Throwable t) { // Log exception, handle individual exceptions as needed
// Store myProp for later retrieval by process() method this.myProp = myProp; }
@Override publicvoidstart() { // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to .. }
@Override publicvoidstop() { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. }
@Override public Status process()throws EventDeliveryException { Statusstatus=null;
// Start transaction Channelch= getChannel(); Transactiontxn= ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do
Eventevent= ch.take();
// Send the Event to the external repository. // storeSomeData(e);
txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors if (t instanceof Error) { throw (Error)t; } } return status; } }