Skip to content

User Manual

Introduction

InfoConnect for Kafka is a simple, lightweight, and responsive Kafka Connect-compatible product that enables IBM i (AS400, iSeries) replication and event driven solutions with little to no custom development and IBM i knowledge required. The product is a native Kafka Connect plug-in intended for companies that already operate on, or plan to implement, Kafka infrastructure. It is certified by Confluent and is compatible with majority of Kafka platforms and vendors, such as open source Apache Kafka, Confluent Platform, Confluent Cloud, IBM Event Streams, Azure Event Hubs, AWS MKS, Red Panda, and others.

Below are the typical use cases:

  • Real time IBM i Data Replication pipeline - works natively with InfoCDC and delivers data change events from InfoCDC Replication Queues to Kafka topics and on to the targets with no custom IBM i or Kafka development
  • Execute IBM i business logic (via program call or automating the green screen user action) based on a message in Kafka topic
  • Send messages to IBM i Data Queue or stream the messages from Data Queue to Kafka topics

Kafka-based event driven platforms are a diverse ecosystem that consists of core Cluster / Broker, APIs and SDKs for message producers and consumers to interact with the Broker, Kafka Connect clusters for hosting out-of-the-box connectors for major source and target systems (including IBM i with InfoConnect for Kafka), data processing and streaming tools, instrumentation / alerting / quality of service apps, and a healthy third party app ecosystem.

InfoConnect for Kafka is a Kafka Connect plugin, either within the existing Kafka ecosystem or as a separate Kafka Connect stack. Below is a high level diagram for typical Kafka based data pipelines:

image

Companies that do not already operate Kafka Connect infrastructure or Kafka Brokers, may consider leveraging a self-sufficient independent component such as Infoconnect Hub that could be deployed anywhere (including directly on IBM i) and serve as a real-time bridge between IBM i / InfoCDC and messaging platforms.

Contact us for connector pricing information, trial licenses, or any support-related questions.

Hardware, Software, and Network Requirements

IBM i

  • Kafka Connect runtimes where InfoConnect for Kafka connectors are deployed, must be able to reach IBM i servers on the following ports:

  • Non-TLS connections: 446, 449, 8470, 8472, 8473, 8475, 8476

  • TLS connections: 448, 449, 9470, 9472, 9473, 9475, and 9476

  • IBM i must have the following host servers running in the QSYSWRK subsystem: CENTRAL, DTAQ, RMTCMD, SIGNON, and *SRVMAP.

  • If a secure TLS connection is being used, the TLS certificate must be applied to the following services in Digital Certificate Manager: Central, Data Queue, Remote Command, Sign on, and DDM / DRDA services. Please refer to IBM i TLS Configuration for more details.

  • The IBM i user ID must be authorized to perform operations on the intended IBM i objects.

  • If there is additional security software in place that restricts remote execution functionality, the IBM i user ID defined for the connector configuration must be granted permission to execute remote calls and access database, IFS, and DDM services.

Kafka Connect

InfoConnect for Kafka is a lightweight plug-in that runs well even in scaled down Kafka Connect configurations. The hardware and software requirements strongly depend on the Kafka Broker and Kafka Connect editions, vendors, and deployment options.

Confluent Platform System Requirements is a good reference point when considering minimal vs production - ready configurations and supported OS.

In our tests and demos we used a moderately sized Linux VM (16G memory and 2 cores) to run stand-alone Confluent platform including Broker, Schema Registry, and Connect (as well as suppoting services such as management REST API, Control Center web UI, and Zookeeper).

Compatibility Matrix

Application/Service Version
Apache Kafka 3.0.0 or higher
IBM i / OS400 V5R4 or higher

Quick Start Guide

InfoConnect for Kafka is a set of connectors running within Kafka Connect workers, which interact with Kafka Broker - the core event streaming infrastructure that operates topics, schema registry, and other services

First things first! The easiest way to download our connector is from Confluent Hub even if you plan to leverage a different Kafka flavor. Just download the self-managed Confluent Platform connector zip file from that page. Separately, contact us to request trial license.

Customers running their Kafka Connect infrastructure in contenirized environments should refer to their respective Kafka vendor documentation to determine the right way to download and install the connector. If downloading the connector automatically from Confluent Hub, for example as part of docker build or CI / CD script, please ensure that the firewall rules allow downloading the assets from Confluent Hub.

In case you plan to try this connector together with Change Data Capture or data replication tools, request InfoCDC installation package and license in addition to Kafka Connector. Please refer to InfoCDC documentation for more details on product installation and configuration.

The fast track to try InfoConnect for Kafka would depend on whether customers already operate Kafka platform and Kafka Connect infrastructure, as well as specific Kafka vendor. Below are the three most typical scenarios.

Existing Kafka Broker and Kafka Connect infrastructure

Customers who already operate their Kafka Platform as well as Kafka Connect, only need to install InfoConnect for Kafka into their existing Kafka Connect environment. Confluent customers can follow the Confluent Connector installation instructions via Confluent CLI or installing a previously downloaded connector manually. The process is similar for other Kafka vendors and platforms.

Kafka Connect workers may have to be restarted to discover the newly installed connector, then customers can configure IBM i connector operations.

Existing Kafka Broker, no Kafka Connect infrastructure

Customers who do not currently operate Kafka Connect infrastructure but already leverage Kafka foundational services (such as Kafka topics, Schema registry etc) could follow Kafka vendor instructions to set up and run a minimal Kafka Connect configuration. For example, Confluent customers can use this Confluent Kafka Connect Quick Start guide to set up Kafka Connect in the appropriate deployment configuration, the simplest being stand-alone mode in their development environment.

Once Kafka Connect is up and running and can talk to Kafka Broker, follow the steps above to download and install InfoConnect for Kafka plugin then configure IBM i connector operations.

No Kafka Broker or Kafka Connect infrastructure

For customers that do not currently operate any Kafka services, the easiest way to evaluate the IBMi connectivity is to install and run both Kafka Broker and Kafka Connect services in a single environment. Depending on the preferred Kafka edition and vendor, follow their respective quick start guides. For example: Apache Kafka Quick Start - follow the instructions up to step 6 (install Kafka Connect). Confluent Platform Docker Quick Start Confluent Cloud Quick Start

We recommend the simplest stand-alone configuration for the initial trial purposes. Our Engineering team can help you determine the appropriate topology and deployment model for Kafka Brokers and Kafka Connect components that meets your initial evaluation and production ready target state requirements.

Once Kafka Broker and Kafka Connect service are running successfully, follow the installation steps above to enable InfoConnect for Kafka connectors, then configure IBM i connector operations.

Customers who already run on different messaging and middleware platforms may consider leveraging InfoConnect Hub as a self-contained simplified IBM i integration component that surfaces IBM i business logic as REST APIs and supports most popular messaging stacks. Contact us to find out more details and discuss the best option with our team.

Sample Stand-Alone Confluent Docker Setup Below is one way to quickly set up all required Kafka Broker, Schema Registry, Kafka Connect, and InfoConnect IBMi connectors using Confluent Platform components. Note that depending on host OS and your specific configuration you may have to make changes, for example to specify the fully qualified path to license and truststore folder, and other changes as necessary.

  1. Install docker and review sample Docker File

  2. Download and install InfoConnect license file as400-license.lic into a local directory, for example

    /home/ubuntu/license/:/opt/
    
  3. Download and start all pre-requisite components

    docker-compose up -d
    

    Verify that all services are up and running

    docker-compose ps -a
    

    image

  4. Verify that the license is copied to Docker image's /opt/ folder:

    docker exec -it connect bash
    

    Once in docker shell, execute:

    cd /opt/
    ls -l
    

    You should see the AS400 license file in that folder similar to the screen shot below

    image

  5. To verify the InfoConnect for Kafka connector installation was successful, execute the following commands while in docker image shell:

    cd /usr/share/confluent-hub-components/ 
    ls -l
    

    You should see IBM i (AS400) connector in the list, similar to below screen shot

    image

  6. Access Confluent Control Center at http://{environment-host}:9021. Connect components should list AS400 Data Queue Source and Sink, as well as Program Call Sink components similar to the screen shot below

    image

AS400 Connection Configuration Properties

  • Connection
Parameter Description Mandatory Default Value configuration keys for parameters
Name Enter a unique label for the connector in your application. Required AS400SourceConnectorConnector_0 name
AS400 URL AS400 system connection url. Required null as400.url
User Id AS400 System user Required null as400.userId
PASSWORD AS400 system connection password. Required null as400.password
License protocol Please refere above mentioned license management section Required null as400.license.protocol
Truststore file protocol Please refere above mentioned truststore management section Optional null Truststore file protocol
IASP Logical partitions in the systems. Optional null as400.iasp
Library List List of libraries, in addition to the library list associated with the user id. The libraries must be separated with comma and will be added to the top of the library list. Optional null as400.libraries
Secure Connection Enable secure connection with AS400 over encrypted channel. Optional False as400.secure.connection
Socket Timeout Socket Timeout, ms. Default value of -1 means the default JVM SO_TIMEOUT will be used Optional -1 as400.socket.timeout
Time unit to be used for Socket Timeout Socket Timeout time unit Optional MILLISECONDS as400.socket.timeunit
Connection Retries Number of times to retry establishing the connection internally before throwing an exception and passing back to Kafka connection Manager. Optional 3 as400.connection.retry
Reconnection Period Time between internal reconnection retries in ms. Optional 60000 as400.reconnection.period
Time unit to be used for Reconnection Period Reconnection period time unit. Optional MILLISECONDS as400.reconnection.timeunit
Connection Time to Live Max time (Seconds) that the connection can be used. Optional 0 as400.connection.live
Reconnection Period time out Time out to be used for connection time out to live Optional SECONDS
  • Connection (Optional)
Parameter Description Mandatory Default Value configuration keys for parameters
Operation Type An Operation type to be done on AS400 FILE = 0, PRINT = 1,COMMAND = 2,DATAQUEUE = 3, DATABASE = 4, RECORDACCESS = 5, CENTRAL = 6, SIGNON = 7 Optional 2 as400.operation.type
CCSID CCSID stands for "Coded Character Set Identifier." It is a numerical representation that defines a specific character encoding or character set used in IBM systems, including the IBM i (AS/400) platform. Optional 0 as400.ccsid
Pre Start Count Data Queue - Optional 2 as400.prestart.count.dq
Pre Start Count Command - Optional 2 as400.prestart.count.command
Cleanup Interval - Optional 2 as400.cleanup.interval
Max Connection Maximum connections allowed. Optional 5 as400.max.connection
Max Inactivity Maximum time to inactive the session for connection. Optional 10 as400.max.inactivity
Max Lifetime Maximum lifetime for connection. Optional 60000 as400.max.lifetime
Max Use Count - Optional 10 as400.max.usecount
Max Use Time - Optional 30000 as400.max.usetime
Pre-Test Connection - Optional true as400.pretest.connection
Run Maintenance - Optional true as400.run.maintenance
Thread Used - Optional true as400.thread.used
Keep Alive - Optional true as400.keep.alive
Login Timeout - Optional 0 as400.login.timeout
Receive Buffer Size - Optional 1000 as400.receive.buffer.size
Send Buffer Size - Optional 1000 as400.send.buffer.size
So Linger - Optional 0 as400.so.linger
So Timeout - Optional 0 as400.so.timeout
TCP No Delay - Optional true as400.tcp.nodelay

License Management:

InfoConnect for Kafka requires a valid license. Contact us to request a free trial license or purchase a long-term license. Infoview Sales team will provide a license file as400-license.lic that includes entitelement to work with specific IBM i servers.

InfoConnect for Kafka support various internal and shared locations and protocols for storing the license:

Protocols Parameters to configure Mandatory configuration keys for parameters
FILE path
filename
required
required
as400.license.path
license.fileName
S3 S3 bucket path
filename
S3 region
Access key
Secret key
required
required
required
required
required
s3.bucket
license.fileName
s3.region
s3.accessKey
s3.secretKey
S3_IAM S3 bucket path
filename
S3 region
required
required
required
s3.bucket
license.fileName
s3.region
FTP Host
directory path
filename
username
password
required
required
required
required
required
ftp.host
ftp.dir.path
license.fileName
ftp.username
ftp.password
CLASSPATH filename required license.fileName
HTTP HTTP Host
HTTP Directory
filename
HTTP Username
HTTP Password
required
required
required
required
required
http.url
http.dir.path
license.fileName
http.username
http.password
HTTPS HTTPS Host
HTTPS Directory
filename
HTTPS Username
HTTPS Password
required
required
required
required
required
https.url
https.dir.path
license.fileName
https.username
https.password
BROKER License Topic required license.licenseTopic
DIRECT License Text required license.licenseText

Truststore Management:

The IBM i connector requires a truststore file "info400new.truststore" when the secure connection property is set to true. This enables access to specific IBM i system(s) over TLS connection. Managing the truststore file can be done in various ways using different protocols, including S3, HTTP/HTTPS, FTP, FILE, etc., and accessing it through these protocols requires configuration in the connector settings.

Available protocols for loading the truststore file include HTTP, HTTPS, FTP, S3, FILE, and CLASSPATH. The specific parameters to configure depend on the chosen protocol.

image

Based on protocol type needs to configure below properties

Protocols Parameters to configure Mandatory configuration keys for parameters
FILE path
filename
required
required
as400.tlsFile.path
truststore.fileName
S3 S3 bucket path
filename
S3 region
Access key
Secret key
required
required
required
required
required
s3.bucket.tls
truststore.fileName
s3.region.tls
s3.accessKey.tls
s3.secretKey.tls
S3_IAM S3 bucket path
filename
S3 region
required
required
required
s3.bucket.tls
truststore.fileName
s3.region.tls
FTP Host
directory path
filename
username
password
required
required
required
required
required
ftp.host.tls
ftp.dir.path.tls
truststore.fileName
ftp.username.tls
ftp.password.tls
CLASSPATH filename required license.fileName
HTTP HTTP Host
HTTP Directory
filename
HTTP Username
HTTP Password
required
required
required
required
required
http.url.tls
http.dir.path.tls
truststore.fileName
http.username.tls
http.password.tls
HTTPS HTTPS Host
HTTPS Directory
filename
HTTPS Username
HTTPS Password
required
required
required
required
required
https.url.tls
https.dir.path.tls
truststore.fileName
https.username.tls
https.password.tls

Connector Operations

The connector provides support for the following operations:

Operation Version
InfoCDC / Data Queue Source The connector is capable of continuously monitoring for new messages that arrive at a designated data queue.
Data Queue Sink The connector can write messages to a data queue.
Program Call Sink The connector is able to execute an IBM i program.
Screen Automation (RPA) Sink Automate green screen user navigation

AS400 Source Connector Configuration Properties

Configure these connector properties.

image

Parameter Description Mandatory Default Value configuration keys for parameters
Data Queue Read data queue name. Required null as400.read.dataqueue.name
Library Read data queue library. Required null as400.read.dataqueue.library
Key Must be specified for keyed data queues and blank for non-keyed data queues. For reading any message from data queue. Optional null as400.read.dataqueue.key
Key Search Type Must be specified for keyed data queues. For reading any message from data queue, available search types are equal,not equal,greater than,less than,greater than or equal,less than or equal. Optional null as400.read.dataqueue.key.search.type
Keep messages in Queue Ensure it is unchecked unless the intent is to leave the message in the queue after reading. Optional true as400.source.keep.message
Format File Name Optional parameter allows treating data queue entry as an externally defined data structure. When defined, the connector will dynamically retrieve the record format from the specified IBM i file, and parse the received data queue entry into the map of field name / value pairs. The connector will perform the type conversion, supporting all types such as packed, date / time etc. Optional null as400.source.format.name
Format File Library When format file is specified, the format file library can also be specified, otherwise the format file will be located based on the connection library list. Optional null as400.source.file.library
Format File Exception Action Any message fails due to format file error, Connector will discard the message or write back to queue based on the option provided Optional discard as400.source.format.action
Is InfoCDC Configured Should be true if InfoCDC is subscribed Optional true as400.isInfoCDCConfigured
Number of Consumers Number of consumers. Optional 4 as400.source.consumer.numbers

JsonConverter(Source)

Note: Here is a sample property for AS400 source connector with JsonConverter configuration.

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "/opt",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schemas.enable": "false"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.

Note: Here's how you might include these properties in your configuration:

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "as400.read.dataqueue.key": "your-key",
    "as400.read.dataqueue.key.search.type":"greater than or equal",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schemas.enable": "false"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.

You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.

JsonSchemaConverter(Source)

Note: Here is a sample property for AS400 source connector with JsonSchemaConverter and schema registry configuration.

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "/opt",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.

Note: Here's how you might include these properties in your configuration:

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "as400.read.dataqueue.key": "your-key",
    "as400.read.dataqueue.key.search.type":"greater than or equal",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.

You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.

AvroConverter(Source)

Note: Here is a sample property for AS400 source connector with AvroConverter configuration.

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "/opt",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.

Note: Here's how you might include these properties in your configuration:

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "as400.read.dataqueue.key": "your-key",
    "as400.read.dataqueue.key.search.type":"greater than or equal",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.

You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.

Source Acknowledgement

Acknowledgement can be done in two ways: either by writing an acknowledgement to the response data queue or by calling a program with the necessary parameters

To write an acknowledgement back to the data queue, the following configuration is required:

Parameter Description Mandatory Default Value configuration keys for parameters
Data Queue Will update the response back to the data queue Optional null as400.write.dataqueue.name
Library Response data queue library Optional null as400.write.dataqueue.library
is Keyed DataQ Response data queue Expression Optional null as400.write.dataqueue.key

To call a program for acknowledgement, the following configuration is required:

Parameter Description Mandatory Default Value configuration keys for parameters
Program name Name of the program Optional null as400.program.name
Program Library Library of the program you want to invoke Optional null as400.program.library
Program Library List Library list Optional null as400.program.libraryList

Here is the configuration to send the response through a program call. The program INFOCDCCOM/CDCPGMACKR is in place to accept and process the acknowledgements.

    "source.ack.type": "Program",
    "as400.program.name": "CDCPGMACKR",
    "as400.program.library": "INFOCDCCOM",
    "as400.program.libraryList": "INFOCDCCOM, INFOCDCDTA",

AS400 Data Queue Sink Connector Configuration Properties

Configure these connector properties.

image

Parameter Description Mandatory Default Value configuration keys for parameters
Data Queue Write data queue name. Required null as400.write.dataqueue.name
Library Write data queue library. Required null as400.write.dataqueue.library
Is Keyed DataQ Must be specified for keyed data queues and blank for non-keyed data queues. For reading any message from data queue. Optional false as400.write.dataqueue.key
Format File Name Optional parameter allows treating data queue entry as an externally defined data structure. When defined, the connector will dynamically retrieve the record format from the the specified IBM i file and parse the received data queue entry into the map of field name / value pairs. The connector will perform the type conversion, supporting all types such as packed, date / time etc. Optional null as400.sink.format.name
Format File Library When format file is specified, the format file library can also be specified, otherwise the format file will be located based on the connection library list. Optional null as400.sink.file.library
DQ Entry Length Max DQ Entry Length. When specified and greater than 0, the parameter value will be truncated to fit the max length. Optional 0 as400.dq.entry.length
DQ Key Length Max DQ Key Length. When specified and greater than 0, the parameter value will be used (instead of dynamically retrieving it from DQ definitions on the server). Optional null as400.dq.key.length

JsonConverter(Sink)

Note: Here is a sample property for AS400 Data Queue Sink connector with JsonConverter and Dead letter Queue configuration.

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "json_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "value.converter.schemas.enable": "false"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.

Note: Here's an example of how you might include this property in your configuration:

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "json_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "as400.write.dataqueue.key":"true",
    "value.converter.schemas.enable": "false"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

JsonSchemaConverter(Sink)

Note: Here is a sample property for AS400 Data Queue Sink connector with JsonSchemaConverter, schema registry and Dead letter Queue configuration.

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "jsonSchema-CDCDQ8_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.

Note: Here's an example of how you might include this property in your configuration:

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "jsonSchema-CDCDQ8_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "as400.write.dataqueue.key":"true",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

AvroConverter(Sink)

Note: Here is a sample property for AS400 Data Queue Sink connector with JsonConverter, schema registry and Dead letter Queue configuration.

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "avro_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.

Note: Here's an example of how you might include this property in your configuration:

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "avro_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "as400.write.dataqueue.key":"true",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

AS400 Program Call Sink Connector Configuration Properties

Configure these connector properties

image

Parameter Description Mandatory Default Value configuration keys for parameters
Program Name AS400 program name Required null as400.program.name
Program Library Program library name Required null as400.program.library
Program Parameters List of definitions and value references of program parameters Optional null as400.program.parameters
Procedure Name Name of the procedure Optional null as400.procedure.name
Procedure Returns Value Indicator if the program procedure returns a value. Optional false as400.procedure.returnsValue
Threadsafe Indicator if the program is thread safe Optional false as400.threadsafe
Sink Target Topic Sink target topic is a topic to push program call output Required null sink.kafka.topic
Kafka Partition Key Kafka partition key to push record to topic in the specific location Required null sink.kafka.partition.key

Note: Here is a sample property for AS400 Program Call Sink connector configuration

{
  "name": "AS400ProgramCallSinkConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400ProgramCallSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "CRTORDER_INPUT",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.program.name": "program-name",
    "as400.program.library": "KFKDEMOS",
    "as400.program.libraryList": "comma-separated-list-of-libraries"
    "as400.program.parameters": "{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0, 
\"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\": 
\"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\", 
\"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\":
[] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\":
\"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\",
\"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\": 
\"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ { 
\"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\":
1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5, 
\"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\", 
\"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }",
    "as400.threadsafe": "false",
    "value.converter.schemas.enable": "false"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

Note: when calling Program (PGM object) and Service Program (SRVPGM object) on the IBM i (AS/400) platform. Here's how you can use the as400.procedure.name property to define procedures when calling these objects:

For PGM Object:

When calling a Program (*PGM object), you typically don't need to specify a procedure name, as Programs in this context are generally standalone units of code that are executed without calling a specific procedure within them.

For SRVPGM Object:

Service Programs (*SRVPGM objects) are designed to be reused by multiple programs, and they can contain multiple exported procedures that can be called individually. When calling a Service Program and you intend to use a specific procedure within it, the procedure name must match the exported name of the procedure you want to execute.

The as400.procedure.name property you mentioned can be used in the configuration of your connector to specify the procedure name when calling a Service Program.

Note: Here's an example of how you might use it in your connector configuration:

{
  "name": "AS400ProgramCallSinkConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400ProgramCallSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "CRTORDER_INPUT",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.program.name": "program-name",
    "as400.program.library": "KFKDEMOS",
    "as400.program.libraryList": "comma-separated-list-of-libraries",
    "as400.program.libraryList.mode": "ADD_FIRST",
    "as400.program.parameters": "{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0, 
\"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\": 
\"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\", 
\"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\":
[] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\":
\"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\",
\"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\": 
\"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ { 
\"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\":
1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5, 
\"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\", 
\"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }",
    "as400.threadsafe": "false",
    "value.converter.schemas.enable": "false",
    "as400.procedure.name":"your-procedure-name"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

IBMi program parameter definitions:

 d main            pr                  extpgm('CRTORDER')                     
 d  srcOrder                      8p 0                                        
 d  trgtOrder                     8p 0                                        
 d  ordAmt                       10p 2                                        
 d  ordSts                       10a                                          
 d  nbrLines                      4p 0                                        
 d    linesIn                          likeds(linesInFmt) dim(10)  

 d linesInfmt      DS                  Dim(10) qualified               
 d   itemNo                      10a                                   
 d   qty                          5p 0                                 
 d   price                       10p 2  

 ```



Example: Input Structure

```{
    "sourceOrderId": 11934700,
    "targetOrderId": 11934700,   
    "orderAmt": 1000.00,
    "orderStatus": "Activated",
    "noOfItems": 2,
    "linesIn": [
        {
            "itemNo": "111",
            "quantity": 10,
            "price": 10.00
        },
        {
            "itemNo": "222",
            "quantity": 90,
            "price": 10.00
        }
    ] 
}

as400.program.parameters : program parameters has to be transformed in below way

{
    "params": [
        {
            "parameterName": "sourceOrderId",
            "sourceFieldName": "$['sourceOrderId']",
            "dataType": "PACKED",
            "length": 8,
            "decimalPositions": 0,
            "usage": "INOUT",
            "count": 1,
            "dataStructureElements": []
        },
        {
            "parameterName": "targetOrderId",
            "sourceFieldName": "$['targetOrderId']",
            "dataType": "PACKED",
            "length": 8,
            "decimalPositions": 0,
            "usage": "INOUT",
            "count": 1,
            "dataStructureElements": []
        },
        {
            "parameterName": "orderAmt",
            "sourceFieldName": "$['orderAmt']",
            "dataType": "PACKED",
            "length": 10,
            "decimalPositions": 2,
            "usage": "INOUT",
            "count": 1,
            "dataStructureElements": []
        },
        {
            "parameterName": "orderStatus",
            "sourceFieldName": "$['orderStatus']",
            "dataType": "STRING",
            "length": 10,
            "decimalPositions": 0,
            "usage": "INOUT",
            "count": 1,
            "dataStructureElements": []
        },
        {
            "parameterName": "numberOfLines",
            "sourceFieldName": "$['noOfItems']",
            "dataType": "PACKED",
            "length": 4,
            "decimalPositions": 0,
            "usage": "IN",
            "count": 1,
            "dataStructureElements": []
        },
        {
            "parameterName": "orderLines",
            "sourceFieldName": "$['linesIn']",
            "dataType": "STRUCTURE",
            "length": 0,
            "decimalPositions": 0,
            "usage": "INOUT",
            "count": 10,
            "dataStructureElements": [
                {
                    "parameterName": "itemNo",
                    "sourceFieldName": "$['itemNo']",
                    "dataType": "STRING",
                    "length": 10,
                    "decimalPositions": 0,
                    "usage": "INOUT",
                    "count": 1,
                    "dataStructureElements": []
                },
                {
                    "parameterName": "quantity",
                    "sourceFieldName": "$['quantity']",
                    "dataType": "PACKED",
                    "length": 5,
                    "decimalPositions": 0,
                    "usage": "INOUT",
                    "count": 1,
                    "dataStructureElements": []
                },
                {
                    "parameterName": "price",
                    "sourceFieldName": "$['price']",
                    "dataType": "PACKED",
                    "length": 10,
                    "decimalPositions": 2,
                    "usage": "INOUT",
                    "count": 1,
                    "dataStructureElements": []
                }
            ]
        }
    ]
}

Note: Inpute structure field names should be matched with respective sourceFieldName so that connector can map values accordingly.

Once have above structure,needs to be stringify it will be same as below:

{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0, 
\"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\": 
\"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\", 
\"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\":
[] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\":
\"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\",
\"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\": 
\"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ { 
\"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\":
1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5, 
\"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\", 
\"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }

Now configuration for programCallSinkConnector is ready to deploy connector.

Schema Registry Configuration

Schema Registry must be configured for Schema Converters to avoid problems with registration updated Schemas after updating Format File

Avro

io.confluent.connect.avro.AvroConverter

JSON Schema

io.confluent.connect.json.JsonSchemaConverter

Schema Registry can be configured in 3 ways: 1. Through Confluent Control Center

Open Topic menu and choose topic where new messages publish. Then click on Schema field. You will be able to see already registered schema. Click on pull down menu ... and choose Compatibility settings.

image

Then choose compatibility level as NONE and save changes

image

These changes will apply for only edited Topic. If you have another topics where messages published with schema you also need to repeat these steps for them. If you don't want to repeat these steps for every new topic, use other options below.

  1. Through Confluent Platform properties file

Open installation folder for Confluent and go to /etc/schema-registry/ folder

Edit schema-registry.properties file with following command

nano schema-registry.properties

Add new property in the end of file

schema.compatibility.level=none

Save changes and restart Confluent Platform if needed to apply changes Compatibility will define as NONE as default for new schemas

  1. Through Docker Compose File

Add new property for schema-registry container in environment section

SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: none

Then restart docker container with following command if needed to apply changes

docker-compose up -d

Compatibility will define as NONE as default for new schemas

Log Exceptions

Connector is able to log all critical exceptions to a specific topic (as400-error-log) upon making log.exceptions=true. Default value of log.exception is false which means connector will not log any exception to topic

Contact Us

Contact us for connector pricing info, trial license, or support questions.