User Manual
Introduction
InfoConnect for Kafka is a simple, lightweight, and responsive Kafka Connect-compatible product that enables IBM i (AS400, iSeries) replication and event driven solutions with little to no custom development and IBM i knowledge required. The product is a native Kafka Connect plug-in intended for companies that already operate on, or plan to implement, Kafka infrastructure. It is certified by Confluent and is compatible with majority of Kafka platforms and vendors, such as open source Apache Kafka, Confluent Platform, Confluent Cloud, IBM Event Streams, Azure Event Hubs, AWS MKS, Red Panda, and others.
Below are the typical use cases:
- Real time IBM i Data Replication pipeline - works natively with InfoCDC and delivers data change events from InfoCDC Replication Queues to Kafka topics and on to the targets with no custom IBM i or Kafka development
- Execute IBM i business logic (via program call or automating the green screen user action) based on a message in Kafka topic
- Send messages to IBM i Data Queue or stream the messages from Data Queue to Kafka topics
Kafka-based event driven platforms are a diverse ecosystem that consists of core Cluster / Broker, APIs and SDKs for message producers and consumers to interact with the Broker, Kafka Connect clusters for hosting out-of-the-box connectors for major source and target systems (including IBM i with InfoConnect for Kafka), data processing and streaming tools, instrumentation / alerting / quality of service apps, and a healthy third party app ecosystem.
InfoConnect for Kafka is a Kafka Connect plugin, either within the existing Kafka ecosystem or as a separate Kafka Connect stack. Below is a high level diagram for typical Kafka based data pipelines:
Companies that do not already operate Kafka Connect infrastructure or Kafka Brokers, may consider leveraging a self-sufficient independent component such as Infoconnect Hub that could be deployed anywhere (including directly on IBM i) and serve as a real-time bridge between IBM i / InfoCDC and messaging platforms.
Contact us for connector pricing information, trial licenses, or any support-related questions.
Hardware, Software, and Network Requirements
IBM i
-
Kafka Connect runtimes where InfoConnect for Kafka connectors are deployed, must be able to reach IBM i servers on the following ports:
-
Non-TLS connections: 446, 449, 8470, 8472, 8473, 8475, 8476
-
TLS connections: 448, 449, 9470, 9472, 9473, 9475, and 9476
-
IBM i must have the following host servers running in the QSYSWRK subsystem: CENTRAL, DTAQ, RMTCMD, SIGNON, and *SRVMAP.
-
If a secure TLS connection is being used, the TLS certificate must be applied to the following services in Digital Certificate Manager: Central, Data Queue, Remote Command, Sign on, and DDM / DRDA services. Please refer to IBM i TLS Configuration for more details.
-
The IBM i user ID must be authorized to perform operations on the intended IBM i objects.
-
If there is additional security software in place that restricts remote execution functionality, the IBM i user ID defined for the connector configuration must be granted permission to execute remote calls and access database, IFS, and DDM services.
Kafka Connect
InfoConnect for Kafka is a lightweight plug-in that runs well even in scaled down Kafka Connect configurations. The hardware and software requirements strongly depend on the Kafka Broker and Kafka Connect editions, vendors, and deployment options.
Confluent Platform System Requirements is a good reference point when considering minimal vs production - ready configurations and supported OS.
In our tests and demos we used a moderately sized Linux VM (16G memory and 2 cores) to run stand-alone Confluent platform including Broker, Schema Registry, and Connect (as well as suppoting services such as management REST API, Control Center web UI, and Zookeeper).
Compatibility Matrix
Application/Service | Version |
---|---|
Apache Kafka | 3.0.0 or higher |
IBM i / OS400 | V5R4 or higher |
Quick Start Guide
InfoConnect for Kafka is a set of connectors running within Kafka Connect workers, which interact with Kafka Broker - the core event streaming infrastructure that operates topics, schema registry, and other services
First things first! The easiest way to download our connector is from Confluent Hub even if you plan to leverage a different Kafka flavor. Just download the self-managed Confluent Platform connector zip file from that page. Separately, contact us to request trial license.
Customers running their Kafka Connect infrastructure in contenirized environments should refer to their respective Kafka vendor documentation to determine the right way to download and install the connector. If downloading the connector automatically from Confluent Hub, for example as part of docker build or CI / CD script, please ensure that the firewall rules allow downloading the assets from Confluent Hub.
In case you plan to try this connector together with Change Data Capture or data replication tools, request InfoCDC installation package and license in addition to Kafka Connector. Please refer to InfoCDC documentation for more details on product installation and configuration.
The fast track to try InfoConnect for Kafka would depend on whether customers already operate Kafka platform and Kafka Connect infrastructure, as well as specific Kafka vendor. Below are the three most typical scenarios.
Existing Kafka Broker and Kafka Connect infrastructure
Customers who already operate their Kafka Platform as well as Kafka Connect, only need to install InfoConnect for Kafka into their existing Kafka Connect environment. Confluent customers can follow the Confluent Connector installation instructions via Confluent CLI or installing a previously downloaded connector manually. The process is similar for other Kafka vendors and platforms.
Kafka Connect workers may have to be restarted to discover the newly installed connector, then customers can configure IBM i connector operations.
Existing Kafka Broker, no Kafka Connect infrastructure
Customers who do not currently operate Kafka Connect infrastructure but already leverage Kafka foundational services (such as Kafka topics, Schema registry etc) could follow Kafka vendor instructions to set up and run a minimal Kafka Connect configuration. For example, Confluent customers can use this Confluent Kafka Connect Quick Start guide to set up Kafka Connect in the appropriate deployment configuration, the simplest being stand-alone mode in their development environment.
Once Kafka Connect is up and running and can talk to Kafka Broker, follow the steps above to download and install InfoConnect for Kafka plugin then configure IBM i connector operations.
No Kafka Broker or Kafka Connect infrastructure
For customers that do not currently operate any Kafka services, the easiest way to evaluate the IBMi connectivity is to install and run both Kafka Broker and Kafka Connect services in a single environment. Depending on the preferred Kafka edition and vendor, follow their respective quick start guides. For example: Apache Kafka Quick Start - follow the instructions up to step 6 (install Kafka Connect). Confluent Platform Docker Quick Start Confluent Cloud Quick Start
We recommend the simplest stand-alone configuration for the initial trial purposes. Our Engineering team can help you determine the appropriate topology and deployment model for Kafka Brokers and Kafka Connect components that meets your initial evaluation and production ready target state requirements.
Once Kafka Broker and Kafka Connect service are running successfully, follow the installation steps above to enable InfoConnect for Kafka connectors, then configure IBM i connector operations.
Customers who already run on different messaging and middleware platforms may consider leveraging InfoConnect Hub as a self-contained simplified IBM i integration component that surfaces IBM i business logic as REST APIs and supports most popular messaging stacks. Contact us to find out more details and discuss the best option with our team.
Sample Stand-Alone Confluent Docker Setup Below is one way to quickly set up all required Kafka Broker, Schema Registry, Kafka Connect, and InfoConnect IBMi connectors using Confluent Platform components. Note that depending on host OS and your specific configuration you may have to make changes, for example to specify the fully qualified path to license and truststore folder, and other changes as necessary.
-
Install docker and review sample Docker File
-
Download and install InfoConnect license file as400-license.lic into a local directory, for example
/home/ubuntu/license/:/opt/
-
Download and start all pre-requisite components
docker-compose up -d
Verify that all services are up and running
docker-compose ps -a
-
Verify that the license is copied to Docker image's /opt/ folder:
docker exec -it connect bash
Once in docker shell, execute:
cd /opt/ ls -l
You should see the AS400 license file in that folder similar to the screen shot below
-
To verify the InfoConnect for Kafka connector installation was successful, execute the following commands while in docker image shell:
cd /usr/share/confluent-hub-components/ ls -l
You should see IBM i (AS400) connector in the list, similar to below screen shot
-
Access Confluent Control Center at http://{environment-host}:9021. Connect components should list AS400 Data Queue Source and Sink, as well as Program Call Sink components similar to the screen shot below
AS400 Connection Configuration Properties
- Connection
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Name | Enter a unique label for the connector in your application. | Required | AS400SourceConnectorConnector_0 | name |
AS400 URL | AS400 system connection url. | Required | null | as400.url |
User Id | AS400 System user | Required | null | as400.userId |
PASSWORD | AS400 system connection password. | Required | null | as400.password |
License protocol | Please refere above mentioned license management section | Required | null | as400.license.protocol |
Truststore file protocol | Please refere above mentioned truststore management section | Optional | null | Truststore file protocol |
IASP | Logical partitions in the systems. | Optional | null | as400.iasp |
Library List | List of libraries, in addition to the library list associated with the user id. The libraries must be separated with comma and will be added to the top of the library list. | Optional | null | as400.libraries |
Secure Connection | Enable secure connection with AS400 over encrypted channel. | Optional | False | as400.secure.connection |
Socket Timeout | Socket Timeout, ms. Default value of -1 means the default JVM SO_TIMEOUT will be used | Optional | -1 | as400.socket.timeout |
Time unit to be used for Socket Timeout | Socket Timeout time unit | Optional | MILLISECONDS | as400.socket.timeunit |
Connection Retries | Number of times to retry establishing the connection internally before throwing an exception and passing back to Kafka connection Manager. | Optional | 3 | as400.connection.retry |
Reconnection Period | Time between internal reconnection retries in ms. | Optional | 60000 | as400.reconnection.period |
Time unit to be used for Reconnection Period | Reconnection period time unit. | Optional | MILLISECONDS | as400.reconnection.timeunit |
Connection Time to Live | Max time (Seconds) that the connection can be used. | Optional | 0 | as400.connection.live |
Reconnection Period time out | Time out to be used for connection time out to live | Optional | SECONDS |
- Connection (Optional)
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Operation Type | An Operation type to be done on AS400 FILE = 0, PRINT = 1,COMMAND = 2,DATAQUEUE = 3, DATABASE = 4, RECORDACCESS = 5, CENTRAL = 6, SIGNON = 7 | Optional | 2 | as400.operation.type |
CCSID | CCSID stands for "Coded Character Set Identifier." It is a numerical representation that defines a specific character encoding or character set used in IBM systems, including the IBM i (AS/400) platform. | Optional | 0 | as400.ccsid |
Pre Start Count Data Queue | - | Optional | 2 | as400.prestart.count.dq |
Pre Start Count Command | - | Optional | 2 | as400.prestart.count.command |
Cleanup Interval | - | Optional | 2 | as400.cleanup.interval |
Max Connection | Maximum connections allowed. | Optional | 5 | as400.max.connection |
Max Inactivity | Maximum time to inactive the session for connection. | Optional | 10 | as400.max.inactivity |
Max Lifetime | Maximum lifetime for connection. | Optional | 60000 | as400.max.lifetime |
Max Use Count | - | Optional | 10 | as400.max.usecount |
Max Use Time | - | Optional | 30000 | as400.max.usetime |
Pre-Test Connection | - | Optional | true | as400.pretest.connection |
Run Maintenance | - | Optional | true | as400.run.maintenance |
Thread Used | - | Optional | true | as400.thread.used |
Keep Alive | - | Optional | true | as400.keep.alive |
Login Timeout | - | Optional | 0 | as400.login.timeout |
Receive Buffer Size | - | Optional | 1000 | as400.receive.buffer.size |
Send Buffer Size | - | Optional | 1000 | as400.send.buffer.size |
So Linger | - | Optional | 0 | as400.so.linger |
So Timeout | - | Optional | 0 | as400.so.timeout |
TCP No Delay | - | Optional | true | as400.tcp.nodelay |
License Management:
InfoConnect for Kafka requires a valid license. Contact us to request a free trial license or purchase a long-term license. Infoview Sales team will provide a license file as400-license.lic that includes entitelement to work with specific IBM i servers.
InfoConnect for Kafka support various internal and shared locations and protocols for storing the license:
Protocols | Parameters to configure | Mandatory | configuration keys for parameters |
---|---|---|---|
FILE | path filename |
required required |
as400.license.path license.fileName |
S3 | S3 bucket path filename S3 region Access key Secret key |
required required required required required |
s3.bucket license.fileName s3.region s3.accessKey s3.secretKey |
S3_IAM | S3 bucket path filename S3 region |
required required required |
s3.bucket license.fileName s3.region |
FTP | Host directory path filename username password |
required required required required required |
ftp.host ftp.dir.path license.fileName ftp.username ftp.password |
CLASSPATH | filename | required | license.fileName |
HTTP | HTTP Host HTTP Directory filename HTTP Username HTTP Password |
required required required required required |
http.url http.dir.path license.fileName http.username http.password |
HTTPS | HTTPS Host HTTPS Directory filename HTTPS Username HTTPS Password |
required required required required required |
https.url https.dir.path license.fileName https.username https.password |
BROKER | License Topic | required | license.licenseTopic |
DIRECT | License Text | required | license.licenseText |
Truststore Management:
The IBM i connector requires a truststore file "info400new.truststore" when the secure connection property is set to true. This enables access to specific IBM i system(s) over TLS connection. Managing the truststore file can be done in various ways using different protocols, including S3, HTTP/HTTPS, FTP, FILE, etc., and accessing it through these protocols requires configuration in the connector settings.
Available protocols for loading the truststore file include HTTP, HTTPS, FTP, S3, FILE, and CLASSPATH. The specific parameters to configure depend on the chosen protocol.
Based on protocol type needs to configure below properties
Protocols | Parameters to configure | Mandatory | configuration keys for parameters |
---|---|---|---|
FILE | path filename |
required required |
as400.tlsFile.path truststore.fileName |
S3 | S3 bucket path filename S3 region Access key Secret key |
required required required required required |
s3.bucket.tls truststore.fileName s3.region.tls s3.accessKey.tls s3.secretKey.tls |
S3_IAM | S3 bucket path filename S3 region |
required required required |
s3.bucket.tls truststore.fileName s3.region.tls |
FTP | Host directory path filename username password |
required required required required required |
ftp.host.tls ftp.dir.path.tls truststore.fileName ftp.username.tls ftp.password.tls |
CLASSPATH | filename | required | license.fileName |
HTTP | HTTP Host HTTP Directory filename HTTP Username HTTP Password |
required required required required required |
http.url.tls http.dir.path.tls truststore.fileName http.username.tls http.password.tls |
HTTPS | HTTPS Host HTTPS Directory filename HTTPS Username HTTPS Password |
required required required required required |
https.url.tls https.dir.path.tls truststore.fileName https.username.tls https.password.tls |
Connector Operations
The connector provides support for the following operations:
Operation | Version |
---|---|
InfoCDC / Data Queue Source | The connector is capable of continuously monitoring for new messages that arrive at a designated data queue. |
Data Queue Sink | The connector can write messages to a data queue. |
Program Call Sink | The connector is able to execute an IBM i program. |
Screen Automation (RPA) Sink | Automate green screen user navigation |
AS400 Source Connector Configuration Properties
Configure these connector properties.
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Data Queue | Read data queue name. | Required | null | as400.read.dataqueue.name |
Library | Read data queue library. | Required | null | as400.read.dataqueue.library |
Key | Must be specified for keyed data queues and blank for non-keyed data queues. For reading any message from data queue. | Optional | null | as400.read.dataqueue.key |
Key Search Type | Must be specified for keyed data queues. For reading any message from data queue, available search types are equal,not equal,greater than,less than,greater than or equal,less than or equal. | Optional | null | as400.read.dataqueue.key.search.type |
Keep messages in Queue | Ensure it is unchecked unless the intent is to leave the message in the queue after reading. | Optional | true | as400.source.keep.message |
Format File Name | Optional parameter allows treating data queue entry as an externally defined data structure. When defined, the connector will dynamically retrieve the record format from the specified IBM i file, and parse the received data queue entry into the map of field name / value pairs. The connector will perform the type conversion, supporting all types such as packed, date / time etc. | Optional | null | as400.source.format.name |
Format File Library | When format file is specified, the format file library can also be specified, otherwise the format file will be located based on the connection library list. | Optional | null | as400.source.file.library |
Format File Exception Action | Any message fails due to format file error, Connector will discard the message or write back to queue based on the option provided | Optional | discard | as400.source.format.action |
Is InfoCDC Configured | Should be true if InfoCDC is subscribed | Optional | true | as400.isInfoCDCConfigured |
Number of Consumers | Number of consumers. | Optional | 4 | as400.source.consumer.numbers |
JsonConverter(Source)
Note: Here is a sample property for AS400 source connector with JsonConverter configuration.
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "/opt",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schemas.enable": "false"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.
Note: Here's how you might include these properties in your configuration:
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"as400.read.dataqueue.key": "your-key",
"as400.read.dataqueue.key.search.type":"greater than or equal",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schemas.enable": "false"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.
You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.
JsonSchemaConverter(Source)
Note: Here is a sample property for AS400 source connector with JsonSchemaConverter and schema registry configuration.
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "/opt",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.
Note: Here's how you might include these properties in your configuration:
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"as400.read.dataqueue.key": "your-key",
"as400.read.dataqueue.key.search.type":"greater than or equal",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.
You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.
AvroConverter(Source)
Note: Here is a sample property for AS400 source connector with AvroConverter configuration.
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "/opt",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.
Note: Here's how you might include these properties in your configuration:
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"as400.read.dataqueue.key": "your-key",
"as400.read.dataqueue.key.search.type":"greater than or equal",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.
You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.
Source Acknowledgement
Acknowledgement can be done in two ways: either by writing an acknowledgement to the response data queue or by calling a program with the necessary parameters
To write an acknowledgement back to the data queue, the following configuration is required:
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Data Queue | Will update the response back to the data queue | Optional | null | as400.write.dataqueue.name |
Library | Response data queue library | Optional | null | as400.write.dataqueue.library |
is Keyed DataQ | Response data queue Expression | Optional | null | as400.write.dataqueue.key |
To call a program for acknowledgement, the following configuration is required:
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Program name | Name of the program | Optional | null | as400.program.name |
Program Library | Library of the program you want to invoke | Optional | null | as400.program.library |
Program Library List | Library list | Optional | null | as400.program.libraryList |
Here is the configuration to send the response through a program call. The program INFOCDCCOM/CDCPGMACKR is in place to accept and process the acknowledgements.
"source.ack.type": "Program",
"as400.program.name": "CDCPGMACKR",
"as400.program.library": "INFOCDCCOM",
"as400.program.libraryList": "INFOCDCCOM, INFOCDCDTA",
AS400 Data Queue Sink Connector Configuration Properties
Configure these connector properties.
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Data Queue | Write data queue name. | Required | null | as400.write.dataqueue.name |
Library | Write data queue library. | Required | null | as400.write.dataqueue.library |
Is Keyed DataQ | Must be specified for keyed data queues and blank for non-keyed data queues. For reading any message from data queue. | Optional | false | as400.write.dataqueue.key |
Format File Name | Optional parameter allows treating data queue entry as an externally defined data structure. When defined, the connector will dynamically retrieve the record format from the the specified IBM i file and parse the received data queue entry into the map of field name / value pairs. The connector will perform the type conversion, supporting all types such as packed, date / time etc. | Optional | null | as400.sink.format.name |
Format File Library | When format file is specified, the format file library can also be specified, otherwise the format file will be located based on the connection library list. | Optional | null | as400.sink.file.library |
DQ Entry Length | Max DQ Entry Length. When specified and greater than 0, the parameter value will be truncated to fit the max length. | Optional | 0 | as400.dq.entry.length |
DQ Key Length | Max DQ Key Length. When specified and greater than 0, the parameter value will be used (instead of dynamically retrieving it from DQ definitions on the server). | Optional | null | as400.dq.key.length |
JsonConverter(Sink)
Note: Here is a sample property for AS400 Data Queue Sink connector with JsonConverter and Dead letter Queue configuration.
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "json_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"value.converter.schemas.enable": "false"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.
Note: Here's an example of how you might include this property in your configuration:
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "json_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"as400.write.dataqueue.key":"true",
"value.converter.schemas.enable": "false"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
JsonSchemaConverter(Sink)
Note: Here is a sample property for AS400 Data Queue Sink connector with JsonSchemaConverter, schema registry and Dead letter Queue configuration.
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "jsonSchema-CDCDQ8_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.
Note: Here's an example of how you might include this property in your configuration:
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "jsonSchema-CDCDQ8_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"as400.write.dataqueue.key":"true",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
AvroConverter(Sink)
Note: Here is a sample property for AS400 Data Queue Sink connector with JsonConverter, schema registry and Dead letter Queue configuration.
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "avro_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.
Note: Here's an example of how you might include this property in your configuration:
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "avro_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"as400.write.dataqueue.key":"true",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
AS400 Program Call Sink Connector Configuration Properties
Configure these connector properties
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Program Name | AS400 program name | Required | null | as400.program.name |
Program Library | Program library name | Required | null | as400.program.library |
Program Parameters | List of definitions and value references of program parameters | Optional | null | as400.program.parameters |
Procedure Name | Name of the procedure | Optional | null | as400.procedure.name |
Procedure Returns Value | Indicator if the program procedure returns a value. | Optional | false | as400.procedure.returnsValue |
Threadsafe | Indicator if the program is thread safe | Optional | false | as400.threadsafe |
Sink Target Topic | Sink target topic is a topic to push program call output | Required | null | sink.kafka.topic |
Kafka Partition Key | Kafka partition key to push record to topic in the specific location | Required | null | sink.kafka.partition.key |
Note: Here is a sample property for AS400 Program Call Sink connector configuration
{
"name": "AS400ProgramCallSinkConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400ProgramCallSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "CRTORDER_INPUT",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.program.name": "program-name",
"as400.program.library": "KFKDEMOS",
"as400.program.libraryList": "comma-separated-list-of-libraries"
"as400.program.parameters": "{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0,
\"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\":
\"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\",
\"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\":
[] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\":
\"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\",
\"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\":
\"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ {
\"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\":
1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5,
\"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\",
\"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }",
"as400.threadsafe": "false",
"value.converter.schemas.enable": "false"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
Note: when calling Program (PGM object) and Service Program (SRVPGM object) on the IBM i (AS/400) platform. Here's how you can use the as400.procedure.name property to define procedures when calling these objects:
For PGM Object:
When calling a Program (*PGM object), you typically don't need to specify a procedure name, as Programs in this context are generally standalone units of code that are executed without calling a specific procedure within them.
For SRVPGM Object:
Service Programs (*SRVPGM objects) are designed to be reused by multiple programs, and they can contain multiple exported procedures that can be called individually. When calling a Service Program and you intend to use a specific procedure within it, the procedure name must match the exported name of the procedure you want to execute.
The as400.procedure.name property you mentioned can be used in the configuration of your connector to specify the procedure name when calling a Service Program.
Note: Here's an example of how you might use it in your connector configuration:
{
"name": "AS400ProgramCallSinkConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400ProgramCallSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "CRTORDER_INPUT",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.program.name": "program-name",
"as400.program.library": "KFKDEMOS",
"as400.program.libraryList": "comma-separated-list-of-libraries",
"as400.program.libraryList.mode": "ADD_FIRST",
"as400.program.parameters": "{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0,
\"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\":
\"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\",
\"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\":
[] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\":
\"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\",
\"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\":
\"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ {
\"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\":
1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5,
\"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\",
\"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }",
"as400.threadsafe": "false",
"value.converter.schemas.enable": "false",
"as400.procedure.name":"your-procedure-name"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
IBMi program parameter definitions:
d main pr extpgm('CRTORDER')
d srcOrder 8p 0
d trgtOrder 8p 0
d ordAmt 10p 2
d ordSts 10a
d nbrLines 4p 0
d linesIn likeds(linesInFmt) dim(10)
d linesInfmt DS Dim(10) qualified
d itemNo 10a
d qty 5p 0
d price 10p 2
```
Example: Input Structure
```{
"sourceOrderId": 11934700,
"targetOrderId": 11934700,
"orderAmt": 1000.00,
"orderStatus": "Activated",
"noOfItems": 2,
"linesIn": [
{
"itemNo": "111",
"quantity": 10,
"price": 10.00
},
{
"itemNo": "222",
"quantity": 90,
"price": 10.00
}
]
}
as400.program.parameters : program parameters has to be transformed in below way
{
"params": [
{
"parameterName": "sourceOrderId",
"sourceFieldName": "$['sourceOrderId']",
"dataType": "PACKED",
"length": 8,
"decimalPositions": 0,
"usage": "INOUT",
"count": 1,
"dataStructureElements": []
},
{
"parameterName": "targetOrderId",
"sourceFieldName": "$['targetOrderId']",
"dataType": "PACKED",
"length": 8,
"decimalPositions": 0,
"usage": "INOUT",
"count": 1,
"dataStructureElements": []
},
{
"parameterName": "orderAmt",
"sourceFieldName": "$['orderAmt']",
"dataType": "PACKED",
"length": 10,
"decimalPositions": 2,
"usage": "INOUT",
"count": 1,
"dataStructureElements": []
},
{
"parameterName": "orderStatus",
"sourceFieldName": "$['orderStatus']",
"dataType": "STRING",
"length": 10,
"decimalPositions": 0,
"usage": "INOUT",
"count": 1,
"dataStructureElements": []
},
{
"parameterName": "numberOfLines",
"sourceFieldName": "$['noOfItems']",
"dataType": "PACKED",
"length": 4,
"decimalPositions": 0,
"usage": "IN",
"count": 1,
"dataStructureElements": []
},
{
"parameterName": "orderLines",
"sourceFieldName": "$['linesIn']",
"dataType": "STRUCTURE",
"length": 0,
"decimalPositions": 0,
"usage": "INOUT",
"count": 10,
"dataStructureElements": [
{
"parameterName": "itemNo",
"sourceFieldName": "$['itemNo']",
"dataType": "STRING",
"length": 10,
"decimalPositions": 0,
"usage": "INOUT",
"count": 1,
"dataStructureElements": []
},
{
"parameterName": "quantity",
"sourceFieldName": "$['quantity']",
"dataType": "PACKED",
"length": 5,
"decimalPositions": 0,
"usage": "INOUT",
"count": 1,
"dataStructureElements": []
},
{
"parameterName": "price",
"sourceFieldName": "$['price']",
"dataType": "PACKED",
"length": 10,
"decimalPositions": 2,
"usage": "INOUT",
"count": 1,
"dataStructureElements": []
}
]
}
]
}
Note: Inpute structure field names should be matched with respective sourceFieldName so that connector can map values accordingly.
Once have above structure,needs to be stringify it will be same as below:
{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0,
\"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\":
\"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\",
\"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\":
[] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\":
\"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\",
\"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\":
\"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ {
\"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\":
1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5,
\"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\",
\"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }
Now configuration for programCallSinkConnector is ready to deploy connector.
Schema Registry Configuration
Schema Registry must be configured for Schema Converters to avoid problems with registration updated Schemas after updating Format File
Avro
io.confluent.connect.avro.AvroConverter
JSON Schema
io.confluent.connect.json.JsonSchemaConverter
Schema Registry can be configured in 3 ways: 1. Through Confluent Control Center
Open Topic menu and choose topic where new messages publish. Then click on Schema field. You will be able to see already registered schema. Click on pull down menu ... and choose Compatibility settings.
Then choose compatibility level as NONE and save changes
These changes will apply for only edited Topic. If you have another topics where messages published with schema you also need to repeat these steps for them. If you don't want to repeat these steps for every new topic, use other options below.
- Through Confluent Platform properties file
Open installation folder for Confluent and go to /etc/schema-registry/ folder
Edit schema-registry.properties file with following command
nano schema-registry.properties
Add new property in the end of file
schema.compatibility.level=none
Save changes and restart Confluent Platform if needed to apply changes Compatibility will define as NONE as default for new schemas
- Through Docker Compose File
Add new property for schema-registry container in environment section
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: none
Then restart docker container with following command if needed to apply changes
docker-compose up -d
Compatibility will define as NONE as default for new schemas
Log Exceptions
Connector is able to log all critical exceptions to a specific topic (as400-error-log) upon making log.exceptions=true. Default value of log.exception is false which means connector will not log any exception to topic
Contact Us
Contact us for connector pricing info, trial license, or support questions.