esppy.connectors.KafkaPublisher

class esppy.connectors.KafkaPublisher(kafkahostport=None, kafkatopic=None, urlhostport=None, kafkapartition=None, kafkatype=None, name=None, is_active=None, transactional=None, blocksize=None, dateformat=None, ignorecsvparseerrors=None, protofile=None, protomsg=None, configfilesection=None, csvfielddelimiter=None, noautogenfield=None, publishwithupsert=None, kafkainitialoffset=None, addcsvopcode=None, addcsvflags=None, kafkaglobalconfig=None, kafkatopicconfig=None, useclientmsgid=None, maxevents=None)

Bases: esppy.connectors.base.Connector

Publish events to a Kafka broker

Parameters
kafkahostportstring

Specifies one or more Kafka brokers in the following form: ‘host:port,host:port,…’.

kafkatopicstring

Specifies the Kafka topic

urlhostportstring

Specifies the host:port field in the metadata topic that is subscribed to on start-up.

kafkapartitionstring, optional

Specifies the Kafka partition

kafkatypestring, optional

Specifies binary, CSV, JSON, or the name of a string field in the subscribed window schema.

namestring, optional

The name of the connector object

transactionalstring, optional

When kafkatype = CSV, sets the event block type to transactional. The default value is normal.

blocksizeint, optional

When kafkatype = CSV, specifies the number of events to include in a published event block. The default value is 1.

dateformatstring, optional

Specifies the format of ESP_DATETIME and ESP_TIMESTAMP fields in CSV events.

ignorecsvparseerrorsboolean, optional

Specifies that when a field in an input CSV event cannot be parsed, the event is dropped, an error is logged, and publishing continues.

protofilestring, optional

Specifies the .proto file that contains the Google Protocol Buffers message definition used to convert event blocks to protobuf messages. When you specify this parameter, you must also specify the protomsg parameter.

protomsgstring, optional

Specifies the name of a Google Protocol Buffers message in the .proto file that you specified with the protofile parameter. Event blocks are converted into this message.

configfilesectionstring, optional

Specifies the name of the section in the connector config file to parse for configuration parameters.

csvfielddelimiterstring, optional

Specifies the character delimiter for field data in input CSV events. The default delimiter is the ‘,’ character.

noautogenfieldboolean, optional

Specifies that input events are missing the key field that is autogenerated by the source window.

publishwithupsertboolean, optional

Specifies to build events with opcode=Upsert instead of opcode=Insert.

kafkainitialoffsetstring or int, optional

Specifies the offset from which to begin consuming messages from the Kafka topic and partition. Valid values are “smallest”, “largest”, or an integer. The default value is “smallest”.

addcsvopcodeboolean, optional

Prepends an opcode and comma to write CSV events. The opcode is Insert unless publish_with_upsert is enabled.

addcsvflagsstring, optional

Specifies the event type to insert into input CSV events (with a comma). Valid values are “normal” and “partialupdate”.

kafkaglobalconfigstring, optional

Specifies a semicolon-separated list of “key=value” strings to configure librdkafka global configuration values.

kafkatopicconfigstring, optional

Specifies a semicolon-separated list of “key=value” strings to configure librdkafka topic configuration values.

useclientmsgidboolean, optional

If the Source window has been restored from a persist to disk, ignore received binary event blocks that contain a message ID less than the greatest message ID in the restored window.

maxeventsint, optional

Specifies the maximum number of events to publish.

Returns
KafkaPublisher
__init__(self, kafkahostport=None, kafkatopic=None, urlhostport=None, kafkapartition=None, kafkatype=None, name=None, is_active=None, transactional=None, blocksize=None, dateformat=None, ignorecsvparseerrors=None, protofile=None, protomsg=None, configfilesection=None, csvfielddelimiter=None, noautogenfield=None, publishwithupsert=None, kafkainitialoffset=None, addcsvopcode=None, addcsvflags=None, kafkaglobalconfig=None, kafkatopicconfig=None, useclientmsgid=None, maxevents=None)

Initialize self. See help(type(self)) for accurate signature.

Methods

__init__(self[, kafkahostport, kafkatopic, …])

Initialize self.

clear(self)

copy(self[, deep])

Return a copy of the object

from_element(data[, session])

Construct connector from XML definition

from_parameters(conncls[, type, name, …])

from_xml(data[, session])

Construct connector from XML definition

get(self, key[, default])

items(self)

keys(self)

pop(self, key[, default])

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem(self)

as a 2-tuple; but raise KeyError if D is empty.

set_properties(self, \*\*kwargs)

Set connector properties

setdefault(self, key[, default])

to_element(self)

Export connector definition to ElementTree.Element

to_xml(self[, pretty])

Export connector definition to XML

update(\*args, \*\*kwds)

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values(self)

Attributes

connector_key

property_defs