Skip to content

User Manual

Introduction

Infoview Systems' InfoConnect suite of products eliminates the stress and impact of integrating IBM i / AS400 legacy systems with development teams. It minimizes the time and resources required for manual integration building, while enabling non-IBM i developers to access legacy business logic and data directly within their modern integration development stack. Certified and rigorously tested by Confluent, the connector is designed to expedite IBM i / AS400 integrations with other systems and services.

We are a global cross-platform service team with a unique fluency in both legacy and modern technology stacks, including Confluent and IBM i / AS400. Infoview's dedicated customer success representatives coordinate just-in-time technical assistance and support for client teams, ensuring you have all the help you need, when you need it. We are more than happy to provide a trial license for our products, participate in discovery sessions, conduct live demos for typical integration scenarios with our Gateway products, and assist with or perform a proof of concept based on specific use cases.

Feel free to Contact us for connector pricing information, trial licenses, or any support-related questions.

IBM i / AS400 Connector for Confluent Overview

The IBM i / AS400 was first introduced in 1988. Since then, it has evolved while keeping pace with new concepts and technologies. It is a remarkably stable and robust modern all-purpose system that surprisingly requires little to no management. The system can securely and predictably run core line of business applications, focusing on quality of service and availability. It offers a compelling total cost of ownership for integrated on-premises solutions. IBM has made several changes to the server and OS name (iSeries, System i, IBM i, Power Systems for i), but most still refer to it as AS/400.

The IBM i platform offers several integration options, including PHP, Java, WebSphere, specialized lightweight web service containers, FTP, SMTP/emails, DB2 interfaces, data queues, integrated file systems - IFS, as well as an abundance of products offered by IBM and third-party vendors. The main benefit of using "native" options, such as Program Calls and Data Queues, is that IBM i development teams do not have to learn another language or purchase and support another technology to build the integration layer. They can easily communicate with external systems using only traditional development tools. The Program Call is the most straightforward and low-code option for exposing IBM i business logic as a reusable asset.

The IBM i connector enables direct program calls from Kafka applications, allowing parameters to be passed into the program and receiving results back in real-time. Data queues are native IBM i objects primarily designed for inter-process communications. They are lightweight persistent queues that support processing by a key and support ordering by FIFO or LIFO organization. The majority of integration use cases can be implemented with a pair of request and response Data Queues. In this setup, the source system places a message in the request data queue and waits for an acknowledgement message to be placed in the response data queue. The target system receives and processes a message from the request data queue, then places the acknowledgement in the response data queue.

The IBM i connector makes it easy to build Kafka code that communicates with IBM i applications. Infoview Systems has also developed the IBM i 'Web Transaction Framework', which makes it very fast and easy to develop IBM i code that communicates with Kafka.

Prerequisites

The connector is designed to operate with IBM i objects (such as programs and data queues). Therefore, this document assumes that you are familiar with IBM i operational and development environments and tools. Additionally, it assumes your familiarity with Confluent Kafka, Kafka Connect, and Confluent Control Center. The document offers configuration examples and provides a detailed explanation of each, along with the properties required to run the connector effectively within Confluent Control Center.

IBM i server configuration and requirements**

  • IBM i must have the following ports accessible from the Confluent runtime: 446, 448, 449, 8470, 8472, 8473, 8475, 8476, 9470, 9472, 9473, 9475, and 9476.

  • IBM i must have the following host servers running in the QSYSWRK subsystem: CENTRAL, DTAQ, RMTCMD, SIGNON, and *SRVMAP.

  • If a secure TLS connection is being used, the TLS certificate must be applied to the following services in Digital Certificate Manager: Central, Data Queue, Remote Command, Sign on, and DDM / DRDA services.

  • The IBM i user ID must be authorized to perform operations on the intended IBM i objects.

  • If there is additional security software in place that restricts remote execution functionality, the IBM i user ID defined for the connector configuration must be granted permission to execute remote calls and access database, IFS, and DDM services.

Dependencies

  • Access to the IBM i server is required.

  • Access to the Confluent Kafka platform is necessary.

Compatibility Matrix

Application/Service Version
Confluent Kafka 6.0.0 or higher
Confluent Control Center 6.0.0 or higher
IBM i / OS400 V5R4 or higher

Connector Operations

The IBM i connector is an operation-based connector. This implies that when you integrate the connector into your Kafka Connect cluster, you are required to configure a specific operation that the connector will carry out. The connector provides support for the following operations:

Operation Version
Read Data Queue (Message Source) The connector is capable of continuously monitoring for new messages that arrive at a designated data queue.
Write to Data Queue The connector can write messages to a data queue.
Program Call The connector is able to execute an IBM i program.

Confluent Setup

  • Steps to setup confluent kafka standalone environment in local system

    https://docs.confluent.io/5.5.0/quickstart/ce-quickstart.html

  • Steps to setup confluent kafka standalone environment in cloud

    https://dzone.com/articles/installing-and-configuring-confluent-platform-kafk

Once the confluent kafka install follows the below steps for connector installation

Install the AS400 Connector

Connectors are packaged as Kafka Connect plugins. Kafka connect isolates each plugin so that the plugin libraries do not conflict with each other. Download and extract the ZIP file for your connector and then follow the below installation instructions.

  • In Standalone mode

    1. kafka-connect-as400 connector can be downloaded directly from the confluent-hub using

          confluent-hub install infoviewsystems/kafka-connect-as400:latest
      
    2. Extract the content into the desired location (Preferred: /confluent/share/java or /confluent/share/confluent-hub-component) Generally it will install /confluent/share/confluent-hub-component location so no need to extract it manually.

    3. Start confluent control center using command “confluent local services start”

    image

    1. Control center should be up and running and can be verified with http://{HOST}:9021

    image

    1. Source and Sink connectors are ready to configure now. And here are the sample configurations to be used.
  • Confluent Setup and connector installation through docker

    1. please find a sample docker-compose.yml file. Note that depending on host OS and your specific configuration you may have to make changes, for example to specify the fully qualified path to license and truststore folder, and other changes as necessary.

      Docker File

    2. Check once the connect service from the above file for

      volumes:

      /home/ubuntu/license/:/opt/

      create license directory in local system (ex. mkdir license) and place the as400-license.lic file in the same directory.

      while running docker-compose.yml file license file i.e as400-license.lic will copy from local to docker container path i.e /opt/.

    3. Now execute below command

      docker-compose up -d

      It will download all images from docker hub and install

      once downloading completed verify the status with below command is all services up and running

      docker-compose ps -a

      image

    4. To verify the license is copied to /opt/, execute the below command to connect with kafka connect service with interactive mode

      docker exec -it connect bash

      execute below command to validate

      cd /opt/

      ls -l

      find the screenshot for reference

      image

    5. To verify the infoviewsystems-as400-kafka-connect is install or not go through below steps

      cd /usr/share/confluent-hub-components/

      ls -l

      find the screenshot for reference

      image

    6. Control center should be up and running and can be verified with http://{HOST}:9021

      image

    7. Source and Sink connectors are ready to configure now. And here are the sample configurations to be used.

License Management:

The IBM i connector requires a license file "as400-license.lic" from Infoview to enable access to specific IBM i system(s). Managing license in different ways by using different protocols such as S3, HTTP/HTTPS, FTP, FILE etc. and accessing it through these protocols in our application needs to configure in connector configuration. Available Protocols to load license file (HTTP,HTTPS, FTP,S3, FILE, CLASSPATH). Based on the protocol parameters needs to be configure.

Based on protocol type needs to configure below properties

Protocols Parameters to configure Mandatory configuration keys for parameters
FILE path
filename
required
required
as400.license.path
license.fileName
S3 S3 bucket path
filename
S3 region
Access key
Secret key
required
required
required
required
required
s3.bucket
license.fileName
s3.region
s3.accessKey
s3.secretKey
S3_IAM S3 bucket path
filename
S3 region
required
required
required
s3.bucket
license.fileName
s3.region
FTP Host
directory path
filename
username
password
required
required
required
required
required
ftp.host
ftp.dir.path
license.fileName
ftp.username
ftp.password
CLASSPATH filename required license.fileName
HTTP HTTP Host
HTTP Directory
filename
HTTP Username
HTTP Password
required
required
required
required
required
http.url
http.dir.path
license.fileName
http.username
http.password
HTTPS HTTPS Host
HTTPS Directory
filename
HTTPS Username
HTTPS Password
required
required
required
required
required
https.url
https.dir.path
license.fileName
https.username
https.password
BROKER License Topic required license.licenseTopic
DIRECT License Text required license.licenseText

Truststore Management:

The IBM i connector requires a truststore file "info400new.truststore" when the secure connection property is set to true. This enables access to specific IBM i system(s) over TLS connection. Managing the truststore file can be done in various ways using different protocols, including S3, HTTP/HTTPS, FTP, FILE, etc., and accessing it through these protocols requires configuration in the connector settings.

Available protocols for loading the truststore file include HTTP, HTTPS, FTP, S3, FILE, and CLASSPATH. The specific parameters to configure depend on the chosen protocol.

image

Based on protocol type needs to configure below properties

Protocols Parameters to configure Mandatory configuration keys for parameters
FILE path
filename
required
required
as400.tlsFile.path
truststore.fileName
S3 S3 bucket path
filename
S3 region
Access key
Secret key
required
required
required
required
required
s3.bucket.tls
truststore.fileName
s3.region.tls
s3.accessKey.tls
s3.secretKey.tls
S3_IAM S3 bucket path
filename
S3 region
required
required
required
s3.bucket.tls
truststore.fileName
s3.region.tls
FTP Host
directory path
filename
username
password
required
required
required
required
required
ftp.host.tls
ftp.dir.path.tls
truststore.fileName
ftp.username.tls
ftp.password.tls
CLASSPATH filename required license.fileName
HTTP HTTP Host
HTTP Directory
filename
HTTP Username
HTTP Password
required
required
required
required
required
http.url.tls
http.dir.path.tls
truststore.fileName
http.username.tls
http.password.tls
HTTPS HTTPS Host
HTTPS Directory
filename
HTTPS Username
HTTPS Password
required
required
required
required
required
https.url.tls
https.dir.path.tls
truststore.fileName
https.username.tls
https.password.tls

Please contact Infoview Systems Connector support team at (734) 293-2160 and (+91) 4042707110 or via email sales@infoviewsystems.com and marketing@infoviewsystems.com

AS400 Connection Configuration Properties

  • Connection
Parameter Description Mandatory Default Value configuration keys for parameters
Name Enter a unique label for the connector in your application. Required AS400SourceConnectorConnector_0 name
AS400 URL AS400 system connection url. Required null as400.url
User Id AS400 System user Required null as400.userId
PASSWORD AS400 system connection password. Required null as400.password
License protocol Please refere above mentioned license management section Required null as400.license.protocol
Truststore file protocol Please refere above mentioned truststore management section Optional null Truststore file protocol
IASP Logical partitions in the systems. Optional null as400.iasp
Library List List of libraries, in addition to the library list associated with the user id. The libraries must be separated with comma and will be added to the top of the library list. Optional null as400.libraries
Secure Connection Enable secure connection with AS400 over encrypted channel. Optional False as400.secure.connection
Socket Timeout Socket Timeout, ms. Default value of -1 means the default JVM SO_TIMEOUT will be used Optional -1 as400.socket.timeout
Time unit to be used for Socket Timeout Socket Timeout time unit Optional MILLISECONDS as400.socket.timeunit
Connection Retries Number of times to retry establishing the connection internally before throwing an exception and passing back to Kafka connection Manager. Optional 3 as400.connection.retry
Reconnection Period Time between internal reconnection retries in ms. Optional 60000 as400.reconnection.period
Time unit to be used for Reconnection Period Reconnection period time unit. Optional MILLISECONDS as400.reconnection.timeunit
Connection Time to Live Max time (Seconds) that the connection can be used. Optional 0 as400.connection.live
Reconnection Period time out Time out to be used for connection time out to live Optional SECONDS
  • Connection (Optional)
Parameter Description Mandatory Default Value configuration keys for parameters
Operation Type An Operation type to be done on AS400 FILE = 0, PRINT = 1,COMMAND = 2,DATAQUEUE = 3, DATABASE = 4, RECORDACCESS = 5, CENTRAL = 6, SIGNON = 7 Optional 2 as400.operation.type
CCSID CCSID stands for "Coded Character Set Identifier." It is a numerical representation that defines a specific character encoding or character set used in IBM systems, including the IBM i (AS/400) platform. Optional 0 as400.ccsid
Pre Start Count Data Queue - Optional 2 as400.prestart.count.dq
Pre Start Count Command - Optional 2 as400.prestart.count.command
Cleanup Interval - Optional 2 as400.cleanup.interval
Max Connection Maximum connections allowed. Optional 5 as400.max.connection
Max Inactivity Maximum time to inactive the session for connection. Optional 10 as400.max.inactivity
Max Lifetime Maximum lifetime for connection. Optional 60000 as400.max.lifetime
Max Use Count - Optional 10 as400.max.usecount
Max Use Time - Optional 30000 as400.max.usetime
Pre-Test Connection - Optional true as400.pretest.connection
Run Maintenance - Optional true as400.run.maintenance
Thread Used - Optional true as400.thread.used
Keep Alive - Optional true as400.keep.alive
Login Timeout - Optional 0 as400.login.timeout
Receive Buffer Size - Optional 1000 as400.receive.buffer.size
Send Buffer Size - Optional 1000 as400.send.buffer.size
So Linger - Optional 0 as400.so.linger
So Timeout - Optional 0 as400.so.timeout
TCP No Delay - Optional true as400.tcp.nodelay

AS400 Source Connector Configuration Properties

Configure these connector properties.

image

Parameter Description Mandatory Default Value configuration keys for parameters
Data Queue Read data queue name. Required null as400.read.dataqueue.name
Library Read data queue library. Required null as400.read.dataqueue.library
Key Must be specified for keyed data queues and blank for non-keyed data queues. For reading any message from data queue. Optional null as400.read.dataqueue.key
Key Search Type Must be specified for keyed data queues. For reading any message from data queue, available search types are equal,not equal,greater than,less than,greater than or equal,less than or equal. Optional null as400.read.dataqueue.key.search.type
Keep messages in Queue Ensure it is unchecked unless the intent is to leave the message in the queue after reading. Optional true as400.source.keep.message
Format File Name Optional parameter allows treating data queue entry as an externally defined data structure. When defined, the connector will dynamically retrieve the record format from the specified IBM i file, and parse the received data queue entry into the map of field name / value pairs. The connector will perform the type conversion, supporting all types such as packed, date / time etc. Optional null as400.source.format.name
Format File Library When format file is specified, the format file library can also be specified, otherwise the format file will be located based on the connection library list. Optional null as400.source.file.library
Number of Consumers Number of consumers. Optional 4 as400.source.consumer.numbers
  • Source Response
Parameter Description Mandatory Default Value configuration keys for parameters
Response Data Queue Will update the response back to the data queue. Optional null as400.write.dataqueue.name
Response Data Queue Library Response data queue library. Optional null as400.write.dataqueue.library
Response Data Queue Expression Response data queue Expression. Optional null as400.response.dataqueue.expression

JsonConverter(Source)

Note: Here is a sample property for AS400 source connector with JsonConverter configuration.

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "/opt",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schemas.enable": "false"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.

Note: Here's how you might include these properties in your configuration:

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "as400.read.dataqueue.key": "your-key",
    "as400.read.dataqueue.key.search.type":"greater than or equal",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schemas.enable": "false"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.

You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.

JsonSchemaConverter(Source)

Note: Here is a sample property for AS400 source connector with JsonSchemaConverter and schema registry configuration.

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "/opt",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.

Note: Here's how you might include these properties in your configuration:

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "as400.read.dataqueue.key": "your-key",
    "as400.read.dataqueue.key.search.type":"greater than or equal",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.

You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.

AvroConverter(Source)

Note: Here is a sample property for AS400 source connector with AvroConverter configuration.

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "/opt",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.

Note: Here's how you might include these properties in your configuration:

{
  "name": "AS400SourceConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.read.dataqueue.name": "your-data-queue-name",
    "as400.read.dataqueue.library": "your-Library",
    "as400.source.format.name": "format-file-name",
    "as400.source.file.library": "your-format-file-library",
    "source.kafka.topic": "Test",
    "as400.read.dataqueue.key": "your-key",
    "as400.read.dataqueue.key.search.type":"greater than or equal",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.

You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.

Source Acknowledgement

Source connector is capable of sending response back to dataqueue and here is the configuration to send the response through program-call.

    "source.ack.type": "Program",
    "as400.program.name": "CDCPGMACKR",
    "as400.program.library": "INFOCDCCOM",
    "as400.program.libraryList": "INFOCDCDEM,INFOCDCCOM",

AS400 Data Queue Sink Connector Configuration Properties

Configure these connector properties.

image

Parameter Description Mandatory Default Value configuration keys for parameters
Data Queue Write data queue name. Required null as400.write.dataqueue.name
Library Write data queue library. Required null as400.write.dataqueue.library
Is Keyed DataQ Must be specified for keyed data queues and blank for non-keyed data queues. For reading any message from data queue. Optional false as400.write.dataqueue.key
Format File Name Optional parameter allows treating data queue entry as an externally defined data structure. When defined, the connector will dynamically retrieve the record format from the the specified IBM i file and parse the received data queue entry into the map of field name / value pairs. The connector will perform the type conversion, supporting all types such as packed, date / time etc. Optional null as400.sink.format.name
Format File Library When format file is specified, the format file library can also be specified, otherwise the format file will be located based on the connection library list. Optional null as400.sink.file.library
DQ Entry Length Max DQ Entry Length. When specified and greater than 0, the parameter value will be truncated to fit the max length. Optional 0 as400.dq.entry.length
DQ Key Length Max DQ Key Length. When specified and greater than 0, the parameter value will be used (instead of dynamically retrieving it from DQ definitions on the server). Optional null as400.dq.key.length

JsonConverter(Sink)

Note: Here is a sample property for AS400 Data Queue Sink connector with JsonConverter and Dead letter Queue configuration.

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "json_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "value.converter.schemas.enable": "false"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.

Note: Here's an example of how you might include this property in your configuration:

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "json_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "as400.write.dataqueue.key":"true",
    "value.converter.schemas.enable": "false"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

JsonSchemaConverter(Sink)

Note: Here is a sample property for AS400 Data Queue Sink connector with JsonSchemaConverter, schema registry and Dead letter Queue configuration.

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "jsonSchema-CDCDQ8_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.

Note: Here's an example of how you might include this property in your configuration:

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "jsonSchema-CDCDQ8_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "as400.write.dataqueue.key":"true",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

AvroConverter(Sink)

Note: Here is a sample property for AS400 Data Queue Sink connector with JsonConverter, schema registry and Dead letter Queue configuration.

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "avro_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.

Note: Here's an example of how you might include this property in your configuration:

{
  "name": "AS400DataQueueSinkConnectorConnector_0",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "Test",
    "transforms": "",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "avro_dlq",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.write.dataqueue.name": "your-data-queue-name",
    "as400.write.dataqueue.library": "your-library",
    "as400.sink.format.name": "format-file-name",
    "as400.sink.file.library": "format-file-library",
    "as400.write.dataqueue.key":"true",
    "value.converter.schema.registry.url": "http://localhost:8081"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

AS400 Program Call Sink Connector Configuration Properties

Configure these connector properties

image

Parameter Description Mandatory Default Value configuration keys for parameters
Program Name AS400 program name Required null as400.program.name
Program Library Program library name Required null as400.program.library
Program Parameters List of definitions and value references of program parameters Optional null as400.program.parameters
Procedure Name Name of the procedure Optional null as400.procedure.name
Procedure Returns Value Indicator if the program procedure returns a value. Optional false as400.procedure.returnsValue
Threadsafe Indicator if the program is thread safe Optional false as400.threadsafe
Sink Target Topic Sink target topic is a topic to push program call output Required null sink.kafka.topic
Kafka Partition Key Kafka partition key to push record to topic in the specific location Required null sink.kafka.partition.key

Note: Here is a sample property for AS400 Program Call Sink connector configuration

{
  "name": "AS400ProgramCallSinkConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400ProgramCallSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "CRTORDER_INPUT",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.program.name": "program-name",
    "as400.program.library": "KFKDEMOS",
    "as400.program.libraryList": "comma-separated-list-of-libraries"
    "as400.program.parameters": "{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0, 
\"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\": 
\"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\", 
\"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\":
[] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\":
\"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\",
\"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\": 
\"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ { 
\"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\":
1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5, 
\"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\", 
\"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }",
    "as400.threadsafe": "false",
    "value.converter.schemas.enable": "false"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

Note: when calling Program (PGM object) and Service Program (SRVPGM object) on the IBM i (AS/400) platform. Here's how you can use the as400.procedure.name property to define procedures when calling these objects:

For PGM Object:

When calling a Program (*PGM object), you typically don't need to specify a procedure name, as Programs in this context are generally standalone units of code that are executed without calling a specific procedure within them.

For SRVPGM Object:

Service Programs (*SRVPGM objects) are designed to be reused by multiple programs, and they can contain multiple exported procedures that can be called individually. When calling a Service Program and you intend to use a specific procedure within it, the procedure name must match the exported name of the procedure you want to execute.

The as400.procedure.name property you mentioned can be used in the configuration of your connector to specify the procedure name when calling a Service Program.

Note: Here's an example of how you might use it in your connector configuration:

{
  "name": "AS400ProgramCallSinkConnector",
  "config": {
    "connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400ProgramCallSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "CRTORDER_INPUT",
    "as400.url": "your-ibm-i-host",
    "as400.userId": "your-username",
    "as400.password": "your-password",
    "as400.secure.connection": "false",
    "as400.license.protocol": "FILE",
    "as400.license.path": "license-file-path",
    "license.fileName": "as400-license.lic",
    "as400.program.name": "program-name",
    "as400.program.library": "KFKDEMOS",
    "as400.program.libraryList": "comma-separated-list-of-libraries",
    "as400.program.libraryList.mode": "ADD_FIRST",
    "as400.program.parameters": "{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0, 
\"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\": 
\"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\", 
\"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\":
[] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\":
\"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\",
\"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\": 
\"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ { 
\"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\":
1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5, 
\"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\", 
\"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }",
    "as400.threadsafe": "false",
    "value.converter.schemas.enable": "false",
    "as400.procedure.name":"your-procedure-name"

    /*Below are optional (Required if security enabled) */
    // "security.protocol": "SASL_SSL",
    // "sasl.mechanism": "PLAIN/SCRAM",
    // "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
  }
}

IBMi program parameter definitions:

 d main            pr                  extpgm('CRTORDER')                     
 d  srcOrder                      8p 0                                        
 d  trgtOrder                     8p 0                                        
 d  ordAmt                       10p 2                                        
 d  ordSts                       10a                                          
 d  nbrLines                      4p 0                                        
 d    linesIn                          likeds(linesInFmt) dim(10)  

 d linesInfmt      DS                  Dim(10) qualified               
 d   itemNo                      10a                                   
 d   qty                          5p 0                                 
 d   price                       10p 2  

 ```



Example: Input Structure

```{
    "sourceOrderId": 11934700,
    "targetOrderId": 11934700,   
    "orderAmt": 1000.00,
    "orderStatus": "Activated",
    "noOfItems": 2,
    "linesIn": [
        {
            "itemNo": "111",
            "quantity": 10,
            "price": 10.00
        },
        {
            "itemNo": "222",
            "quantity": 90,
            "price": 10.00
        }
    ] 
}

as400.program.parameters : program parameters has to be transformed in below way

```{ "params": [ { "parameterName": "sourceOrderId", "sourceFieldName": "$['sourceOrderId']", "dataType": "PACKED", "length": 8, "decimalPositions": 0, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "targetOrderId", "sourceFieldName": "$['targetOrderId']", "dataType": "PACKED", "length": 8, "decimalPositions": 0, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "orderAmt", "sourceFieldName": "$['orderAmt']", "dataType": "PACKED", "length": 10, "decimalPositions": 2, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "orderStatus", "sourceFieldName": "$['orderStatus']", "dataType": "STRING", "length": 10, "decimalPositions": 0, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "numberOfLines", "sourceFieldName": "$['noOfItems']", "dataType": "PACKED", "length": 4, "decimalPositions": 0, "usage": "IN", "count": 1, "dataStructureElements": [] }, { "parameterName": "orderLines", "sourceFieldName": "$['linesIn']", "dataType": "STRUCTURE", "length": 0, "decimalPositions": 0, "usage": "INOUT", "count": 10, "dataStructureElements": [ { "parameterName": "itemNo", "sourceFieldName": "$['itemNo']", "dataType": "STRING", "length": 10, "decimalPositions": 0, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "quantity", "sourceFieldName": "$['quantity']", "dataType": "PACKED", "length": 5, "decimalPositions": 0, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "price", "sourceFieldName": "$['price']", "dataType": "PACKED", "length": 10, "decimalPositions": 2, "usage": "INOUT", "count": 1, "dataStructureElements": [] } ] } ] }


**Note:** Inpute structure field names should be matched with respective sourceFieldName so that connector can map values accordingly.

Once have above structure,needs to be stringify it will be same as below:



{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\", \"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\", \"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\": \"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ { \"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }


Now configuration for programCallSinkConnector is ready to deploy connector.


## TLS Configuration

| Parameter     | Description                                               |Mandatory                          |Default Value|
|---------------|-----------------------------------------------------------|-----------------------------------|-------------|
|Truststore|Truststore is used to store jks type of certificates from Certified Authorities (CA) that verify the certificate presented by the server in SSL connection|


![image](https://user-images.githubusercontent.com/46368616/133774025-d397bcd4-b88f-49a7-b06f-305b0dac9f5b.png)


## Truststore

| Parameter     | Description                                               |Mandatory                          |Default Value|configuration keys for parameters|
|---------------|-----------------------------------------------------------|-----------------------------------|-------------|---------------------------------|
|Truststore Filename| truststore file name|optional|null|truststore.fileName|
|Password|The password used to protect the trust store.|Optional|null|TLS.password|
|Insecure|If true, no certificate validations are performed, rendering connections vulnerable to attacks. Use at your own risk.|Optional|false|TLS.isInsecure|
|IsKeystoreConfigured|-|Optional|true|TLS.isKeystoreConfigure|
|IsTruststoreConfigured|-|Optional|true|TLS.isTruststoreConfigured|

## Schema Registry Configuration
Schema Registry must be configured for Schema Converters to avoid problems with registration updated Schemas after updating Format File

**Avro**
```properties
io.confluent.connect.avro.AvroConverter

JSON Schema

io.confluent.connect.json.JsonSchemaConverter

Schema Registry can be configured in 3 ways: 1. Through Confluent Control Center

Open Topic menu and choose topic where new messages publish. Then click on Schema field. You will be able to see already registered schema. Click on pull down menu ... and choose Compatibility settings.

image

Then choose compatibility level as NONE and save changes

image

These changes will apply for only edited Topic. If you have another topics where messages published with schema you also need to repeat these steps for them. If you don't want to repeat these steps for every new topic, use other options below.

  1. Through Confluent Platform properties file

Open installation folder for Confluent and go to /etc/schema-registry/ folder

Edit schema-registry.properties file with following command

nano schema-registry.properties

Add new property in the end of file

schema.compatibility.level=none

Save changes and restart Confluent Platform if needed to apply changes Compatibility will define as NONE as default for new schemas

  1. Through Docker Compose File

Add new property for schema-registry container in environment section

SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: none

Then restart docker container with following command if needed to apply changes

docker-compose up -d

Compatibility will define as NONE as default for new schemas

Log Exceptions

Connector is able to log all critical exceptions to a specific topic (as400-error-log) upon making log.exceptions=true. Default value of log.exception is false which means connector will not log any exception to topic

Contact Us

Contact us for connector pricing info, trial license, or support questions.