esppy.connectors.KafkaSubscriber

class esppy.connectors.KafkaSubscriber(kafkahostport=None, kafkatopic=None, urlhostport=None, kafkapartition=None, kafkatype=None, numbufferedmsgs=None, name=None, is_active=None, snapshot=None, collapse=None, rmretdel=None, hotfailover=None, dateformat=None, protofile=None, protomsg=None, csvincludeschema=None, useclientmsgid=None, configfilesection=None, zookeeperhostport=None, kafkaglobalconfig=None, kafkatopicconfig=None, csvmsgperevent=None, csvmsgpereventblock=None)

Bases: esppy.connectors.base.Connector

Subscribe to events from a Kafka broker

Parameters
kafkahostportstring

Specifies one or more Kafka brokers in the following form: ‘host:port,host:port,…’.

kafkatopicstring

Specifies the Kafka topic

urlhostportstring

Specifies the host:port field in the metadata topic that is subscribed to on start-up.

kafkapartitionstring, optional

Specifies the Kafka partition

kafkatypestring, optional

Specifies binary, CSV, JSON, or the name of a string field in the subscribed window schema.

numbufferedmsgsint, optional

Specifies the maximum number of messages buffered by a standby subscriber connector.

namestring, optional

The name of the connector object

snapshotbool, optional

Specifies whether to send snapshot data

collapsebool, optional

Enables conversion of UPDATE_BLOCK events to make subscriber output publishable.

rmretdelbool, optional

Specifies to remove all delete events from event blocks received by a subscriber that were introduced by a window retention policy.

hotfailoverbool, optional

Enables hot failover mode

dateformatstring, optional

Specifies the format of ESP_DATETIME and ESP_TIMESTAMP fields in CSV events.

protofilestring, optional

Specifies the .proto file that contains the Google Protocol Buffers message definition used to convert event blocks to protobuf messages.

protomsgstring, optional

Specifies the name of a Google Protocol Buffers message in the .proto file that you specified with the protofile parameter.

csvincludeschemastring, optional

When kafkatype=CSV, specifies when to prepend output CSV data with the window’s serialized schema. Valid values: ‘never’, ‘once’, and ‘pereventblock’

useclientmsgidbool, optional

Uses the client-generated message ID instead of the engine-generated message ID when performing a failover operation and extracting a message ID from an event block.

configfilesectionstring, optional

Specifies the name of the section in the connector config file to parse for configuration parameters.

zookeeperhostportstring, optional

Specifies the Zookeeper server in the form ‘host:port’

kafkaglobalconfigstring, optional

Specifies a semicolon-separated list of ‘key=value’ strings to configure librdkafka global configuration values.

kafkatopicconfigstring, optional

Specifies a semicolon-separated list of ‘key=value’ strings to configure librdkafka topic configuration values.

csvmsgpereventbool, optional

For CSV, specifies to send one message per event

csvmsgperevent_blockbool, optional

For CSV, specifies to send one message per event block

Returns
KafkaSubscriber
__init__(self, kafkahostport=None, kafkatopic=None, urlhostport=None, kafkapartition=None, kafkatype=None, numbufferedmsgs=None, name=None, is_active=None, snapshot=None, collapse=None, rmretdel=None, hotfailover=None, dateformat=None, protofile=None, protomsg=None, csvincludeschema=None, useclientmsgid=None, configfilesection=None, zookeeperhostport=None, kafkaglobalconfig=None, kafkatopicconfig=None, csvmsgperevent=None, csvmsgpereventblock=None)

Initialize self. See help(type(self)) for accurate signature.

Methods

__init__(self[, kafkahostport, kafkatopic, …])

Initialize self.

clear(self)

copy(self[, deep])

Return a copy of the object

from_element(data[, session])

Construct connector from XML definition

from_parameters(conncls[, type, name, …])

from_xml(data[, session])

Construct connector from XML definition

get(self, key[, default])

items(self)

keys(self)

pop(self, key[, default])

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem(self)

as a 2-tuple; but raise KeyError if D is empty.

set_properties(self, \*\*kwargs)

Set connector properties

setdefault(self, key[, default])

to_element(self)

Export connector definition to ElementTree.Element

to_xml(self[, pretty])

Export connector definition to XML

update(\*args, \*\*kwds)

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values(self)

Attributes

connector_key

property_defs