esppy.connectors.KafkaPublisher¶
-
class
esppy.connectors.
KafkaPublisher
(kafkahostport=None, kafkatopic=None, urlhostport=None, kafkapartition=None, kafkatype=None, name=None, is_active=None, transactional=None, blocksize=None, dateformat=None, ignorecsvparseerrors=None, protofile=None, protomsg=None, configfilesection=None, csvfielddelimiter=None, noautogenfield=None, publishwithupsert=None, kafkainitialoffset=None, addcsvopcode=None, addcsvflags=None, kafkaglobalconfig=None, kafkatopicconfig=None, useclientmsgid=None, maxevents=None)¶ Bases:
esppy.connectors.base.Connector
Publish events to a Kafka broker
- Parameters
- kafkahostportstring
Specifies one or more Kafka brokers in the following form: ‘host:port,host:port,…’.
- kafkatopicstring
Specifies the Kafka topic
- urlhostportstring
Specifies the host:port field in the metadata topic that is subscribed to on start-up.
- kafkapartitionstring, optional
Specifies the Kafka partition
- kafkatypestring, optional
Specifies binary, CSV, JSON, or the name of a string field in the subscribed window schema.
- namestring, optional
The name of the connector object
- transactionalstring, optional
When kafkatype = CSV, sets the event block type to transactional. The default value is normal.
- blocksizeint, optional
When kafkatype = CSV, specifies the number of events to include in a published event block. The default value is 1.
- dateformatstring, optional
Specifies the format of ESP_DATETIME and ESP_TIMESTAMP fields in CSV events.
- ignorecsvparseerrorsboolean, optional
Specifies that when a field in an input CSV event cannot be parsed, the event is dropped, an error is logged, and publishing continues.
- protofilestring, optional
Specifies the .proto file that contains the Google Protocol Buffers message definition used to convert event blocks to protobuf messages. When you specify this parameter, you must also specify the protomsg parameter.
- protomsgstring, optional
Specifies the name of a Google Protocol Buffers message in the .proto file that you specified with the protofile parameter. Event blocks are converted into this message.
- configfilesectionstring, optional
Specifies the name of the section in the connector config file to parse for configuration parameters.
- csvfielddelimiterstring, optional
Specifies the character delimiter for field data in input CSV events. The default delimiter is the ‘,’ character.
- noautogenfieldboolean, optional
Specifies that input events are missing the key field that is autogenerated by the source window.
- publishwithupsertboolean, optional
Specifies to build events with opcode=Upsert instead of opcode=Insert.
- kafkainitialoffsetstring or int, optional
Specifies the offset from which to begin consuming messages from the Kafka topic and partition. Valid values are “smallest”, “largest”, or an integer. The default value is “smallest”.
- addcsvopcodeboolean, optional
Prepends an opcode and comma to write CSV events. The opcode is Insert unless publish_with_upsert is enabled.
- addcsvflagsstring, optional
Specifies the event type to insert into input CSV events (with a comma). Valid values are “normal” and “partialupdate”.
- kafkaglobalconfigstring, optional
Specifies a semicolon-separated list of “key=value” strings to configure librdkafka global configuration values.
- kafkatopicconfigstring, optional
Specifies a semicolon-separated list of “key=value” strings to configure librdkafka topic configuration values.
- useclientmsgidboolean, optional
If the Source window has been restored from a persist to disk, ignore received binary event blocks that contain a message ID less than the greatest message ID in the restored window.
- maxeventsint, optional
Specifies the maximum number of events to publish.
- Returns
-
__init__
(self, kafkahostport=None, kafkatopic=None, urlhostport=None, kafkapartition=None, kafkatype=None, name=None, is_active=None, transactional=None, blocksize=None, dateformat=None, ignorecsvparseerrors=None, protofile=None, protomsg=None, configfilesection=None, csvfielddelimiter=None, noautogenfield=None, publishwithupsert=None, kafkainitialoffset=None, addcsvopcode=None, addcsvflags=None, kafkaglobalconfig=None, kafkatopicconfig=None, useclientmsgid=None, maxevents=None)¶ Initialize self. See help(type(self)) for accurate signature.
Methods
__init__
(self[, kafkahostport, kafkatopic, …])Initialize self.
clear
(self)copy
(self[, deep])Return a copy of the object
from_element
(data[, session])Construct connector from XML definition
from_parameters
(conncls[, type, name, …])from_xml
(data[, session])Construct connector from XML definition
get
(self, key[, default])items
(self)keys
(self)pop
(self, key[, default])If key is not found, d is returned if given, otherwise KeyError is raised.
popitem
(self)as a 2-tuple; but raise KeyError if D is empty.
set_properties
(self, \*\*kwargs)Set connector properties
setdefault
(self, key[, default])to_element
(self)Export connector definition to ElementTree.Element
to_xml
(self[, pretty])Export connector definition to XML
update
(\*args, \*\*kwds)If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v
values
(self)Attributes
connector_key
property_defs