Apache Kafka as Target
Fivetran HVR supports integrating changes into Kafka location. This section describes the configuration requirements for integrating changes using Integrate and Refresh into Kafka location. For the list of supported Kafka versions, into which HVR can integrate changes, see Integrate changes into location in Capabilities.
HVR uses the Apache Kafka client library - librdkafka to send data packages into Kafka message bus during Continuous Integrate and Bulk Refresh.
Kafka Message Format
By default, the HVR sends messages in JSON format to Kafka location. This behavior is equivalent to defining the action FileFormat with parameter JsonMode=SCHEMA_PAYLOAD and the location property MESSAGE BUNDLING (Kafka_Message_Bundling)=ROW.
If Schema Registry is used, the Kafka message format must be set to a compact AVRO-based or JSON-based format, which is achieved by defining the location property SCHEMA REGISTRY FORMAT (Kafka_Schema_Registry_Format). Note that the AVRO-based format is not a true AVRO because each message is not a valid AVRO file (e.g. no file header). Instead, each message is considered a 'micro AVRO' that comprises fragments of data encoded using the AVRO data type serialization format. Both the 'micro AVRO' and JSON formats, irrespective of whether Schema Registry is utilized, adhere to Confluent's 'Kafka Connect' message format standard. These formats are compatible with any implementation of Kafka sink connectors.
To use the Cloudera Schema Registry, you must use it in the Confluent compatible mode. This can be achieved by specifying the URL in the following format: http://FQDN:PORT/api/v1/confluent, where FQDN:PORT is the address of the Cloudera Schema Registry specified in the location property SCHEMA REGISTRY URL (Kafka_Schema_Registry) while creating a location or by editing the existing location's source and target properties.
When a Kafka location is configured with location property SCHEMA REGISTRY URL (Kafka_Schema_Registry), parameters Json, Xml, Csv, AvroCompression, or Parquet in action FileFormat cannot be used. However, these parameters can be used to send messages in other formats. If parameter Avro is chosen without defining the location property SCHEMA REGISTRY URL then each message will be a valid AVRO file (including a header with the schema and column information), rather than Kafka Connect's more compact AVRO-based format.
Metadata for Messages
In order to process messages from HVR, a Kafka consumer often requires metadata such as table and column names, data types, and more, about the replicated table. If the location is defined with the SCHEMA REGISTRY URL the consumer can read this metadata directly from the registry. When using the JSON format with the default mode (JsonMode=SCHEMA_PAYLOAD), each message already contains this information. Alternatively, you can include metadata in each message by defining the action ColumnProperties with parameters Extra and IntegrateExpression allowing you to add values like {hvr_tbl_name} and {hvr_op}.
Kafka Message Bundling and Size
By default, each Kafka message contains just one row, regardless of the format chosen. Multiple rows can be bundled into a single message using the location property MESSAGE BUNDLING (Kafka_Message_Bundling). This property can be defined while creating a Kafka location or by editing the existing location's Source and Target Properties)
Although bundling of multiple rows can be combined with the Kafka Connect compatible formats (JSON with default mode SCHEMA_PAYLOAD), the resulting (longer) messages no longer conform to Confluent's 'Kafka Connect' standard.
For bundling modes TRANSACTION and THRESHOLD, the number of rows in each message is affected by the location property MESSAGE THRESHOLD (Kafka_Message_Bundling_Threshold). For those bundling modes, rows continue to be bundled into the same message until after this threshold is exceeded. After that happens, the message is sent and new rows are bundled into the next message. The property MESSAGE THRESHOLD (Kafka_Message_Bundling_Threshold) has no effect on the bundling modes ROW or CHANGE.
By default, the minimum size of a Kafka message sent by HVR is 4096 bytes; the maximum size of a Kafka message is 1,000,000 bytes; HVR will not send a message exceeding this size and will instead give a fatal error; if the location property MESSAGE COMPRESS (Kafka_Message_Compress) is used, this error will be raised by a Kafka broker. You can change the maximum Kafka message size that HVR will send by defining the environment variable $HVR_KAFKA_MSG_MAX_BYTES, but ensure not to exceed the maximum message size configured in Kafka broker (settings message.max.bytes). If the message size exceeds this limit then the message will be lost.
HVR_KAFKA_MSG_MAX_BYTES works in two ways:
- checks the size of a particular message and raises an HVR error if the size is exceeded even before transmitting it to a Kafka broker.
- checks the maximum size of compressed messages inside the Kafka transport protocol.
If the message is too big to be sent because it contains multiple rows, then less bundling (e.g. MESSAGE BUNDLING=ROW) or using a lower MESSAGE THRESHOLD can help in reducing the number of rows in each message. Otherwise, the number of bytes used for each row must be lowered; either with a more compact message format or even by actually truncating a column value (by adding action ColumnProperties with parameter TrimDatatype to the capture location).
Syncing Kafka, Interruption of Message Sending, and Consuming Messages with Idempotence
An HVR integrate job performs a sync of messages sent into Kafka at the end of each integrate cycle, instead of after each individual message. This means if the job is interrupted while it is sending messages, and when it is restarted, the sending of multiple rows from the interrupted cycle may be repeated. Programs consuming Kafka messages must be able to cope with this repetition; this is called being 'idempotent'. One technique to be idempotent is to track an increasing sequence in each message and use detect which messages have already been processed. A column with such an increasing sequence can be defined using the following action:
Group | Table | Action | Parameter(s) |
---|---|---|---|
KAFKA | * | ColumnProperties | Name=integ_key, Extra, Datatype=varchar, Length=45, IntegrateExpression="{hvr_integ_seq}" |
If HVR resends a message, its contents will be identical each time, including this sequence number.
Kafka Message Keys and Partitioning
Kafka messages can contain a 'key' that Kafka uses to put messages into partitions, so consumption can be parallelized. HVR typically puts a key into each message which contains a hash computed from values in the 'distribution key' column of each row. This key is present only if the messages are in JSON or AVRO format. It is not present when the location property MESSAGE BUNDLING (Kafka_Message_Bundling) is set to TRANSACTION or THRESHOLD.
Customize Integrate
By default, for Kafka, HVR does not replicate the delete operation performed at the source location. So to integrate the delete operation, an extra column for time key needs to be added in the target location.
For this, action ColumnProperties may be defined with the following parameters:
Group | Table | Action | Parameter(s) |
---|---|---|---|
KAFKA | * | ColumnProperties | Name=op_val, Datatype=integer, IntegrateExpression="{hvr_op}", Extra |
KAFKA | * | ColumnProperties | Name=integ_seq, Datatype=varchar, Length=45, IntegrateExpression="{hvr_integ_seq}", TimeKey, Extra |
Case Sensitive Table Names
By default, HVR stores all table names in lowercase. To store the table names in uppercase on Kafka target location, define the following:
- When creating a source location, select the option Case Sensitive Names at step 4 of the process.
- When creating the target Kafka location, select the option Default Topic and change the default value {hvr_tbl_name} to {hvr_tbl_base_name}.
Azure Event Hubs
When using Azure Event Hubs (a Kafka broker implementation) as a target location, you may get a Kafka policy violation error caused by a request/reply timeout. In this case, we recommend setting the HVR_KAFKA_PROPERTY environment variable for the Azure Event Hubs location using the following configuration.
Here is an example configuration for the HVR_KAFKA_PROPERTY environment variable.
Group | Tables | Action | Parameter |
---|---|---|---|
Apache Kafka | * | Environment | Name=HVR_KAFKA_PROPERTY, Value="queue.buffering.max.messages=2000;request.timeout.ms=100000" |
Additionally, the following environment variables can be configured in certain cases.
- HVR_KAFKA_MSG_DELIVERY_TIMEOUT This parameter allows setting up the message delivery timeout. The default value is 5 minutes, but it could be increased if the throughput value is low but the queue.buffering.max.messages value is big. For example, if queue.buffering.max.messages=100000, throughput=1 unit, the value for HVR_KAFKA_MSG_DELIVERY_TIMEOUT can be 16 minutes;
- HVR_KAFKA_RATE_LIMIT
This parameter allows setting up the amount of outgoing messages. It can resolve the case with some strict limitations (like the consumer could not process more than 100 messages per second). The values for HVR_KAFKA_RATE_LIMIT can be 1/10/100/1000 (messages per second).
- It is recommended to do a refresh for each table separately in a channel that has tables of different sizes.
- It is recommended to run a refresh without parallelism for tables with minimum throughput units on the Azure Event Hubs as this can result in an error.
The following table provides the recommended settings for the queue.buffering.max.messages and request.timeout.ms configuration properties.
Property | Recommended Values | Throughput |
---|---|---|
queue.buffering.max.messages | 1000 (2000 is a maximum) | 1 unit |
queue.buffering.max.messages | 2000 (4000 is a maximum) | 2 units |
queue.buffering.max.messages | 40000 (80000 is a maximum) | 40 units |
request.timeout.ms | up to 30000 ms |
For details on connecting to Azure Event Hubs, see section Location Connection for Apache Kafka.