User Manual
Introduction
Infoview Systems' InfoConnect suite of products eliminates the stress and impact of integrating IBM i / AS400 legacy systems with development teams. It minimizes the time and resources required for manual integration building, while enabling non-IBM i developers to access legacy business logic and data directly within their modern integration development stack. Certified and rigorously tested by Confluent, the connector is designed to expedite IBM i / AS400 integrations with other systems and services.
We are a global cross-platform service team with a unique fluency in both legacy and modern technology stacks, including Confluent and IBM i / AS400. Infoview's dedicated customer success representatives coordinate just-in-time technical assistance and support for client teams, ensuring you have all the help you need, when you need it. We are more than happy to provide a trial license for our products, participate in discovery sessions, conduct live demos for typical integration scenarios with our Gateway products, and assist with or perform a proof of concept based on specific use cases.
Feel free to Contact us for connector pricing information, trial licenses, or any support-related questions.
IBM i / AS400 Connector for Confluent Overview
The IBM i / AS400 was first introduced in 1988. Since then, it has evolved while keeping pace with new concepts and technologies. It is a remarkably stable and robust modern all-purpose system that surprisingly requires little to no management. The system can securely and predictably run core line of business applications, focusing on quality of service and availability. It offers a compelling total cost of ownership for integrated on-premises solutions. IBM has made several changes to the server and OS name (iSeries, System i, IBM i, Power Systems for i), but most still refer to it as AS/400.
The IBM i platform offers several integration options, including PHP, Java, WebSphere, specialized lightweight web service containers, FTP, SMTP/emails, DB2 interfaces, data queues, integrated file systems - IFS, as well as an abundance of products offered by IBM and third-party vendors. The main benefit of using "native" options, such as Program Calls and Data Queues, is that IBM i development teams do not have to learn another language or purchase and support another technology to build the integration layer. They can easily communicate with external systems using only traditional development tools. The Program Call is the most straightforward and low-code option for exposing IBM i business logic as a reusable asset.
The IBM i connector enables direct program calls from Kafka applications, allowing parameters to be passed into the program and receiving results back in real-time. Data queues are native IBM i objects primarily designed for inter-process communications. They are lightweight persistent queues that support processing by a key and support ordering by FIFO or LIFO organization. The majority of integration use cases can be implemented with a pair of request and response Data Queues. In this setup, the source system places a message in the request data queue and waits for an acknowledgement message to be placed in the response data queue. The target system receives and processes a message from the request data queue, then places the acknowledgement in the response data queue.
The IBM i connector makes it easy to build Kafka code that communicates with IBM i applications. Infoview Systems has also developed the IBM i 'Web Transaction Framework', which makes it very fast and easy to develop IBM i code that communicates with Kafka.
Prerequisites
The connector is designed to operate with IBM i objects (such as programs and data queues). Therefore, this document assumes that you are familiar with IBM i operational and development environments and tools. Additionally, it assumes your familiarity with Confluent Kafka, Kafka Connect, and Confluent Control Center. The document offers configuration examples and provides a detailed explanation of each, along with the properties required to run the connector effectively within Confluent Control Center.
IBM i server configuration and requirements**
-
IBM i must have the following ports accessible from the Confluent runtime: 446, 448, 449, 8470, 8472, 8473, 8475, 8476, 9470, 9472, 9473, 9475, and 9476.
-
IBM i must have the following host servers running in the QSYSWRK subsystem: CENTRAL, DTAQ, RMTCMD, SIGNON, and *SRVMAP.
-
If a secure TLS connection is being used, the TLS certificate must be applied to the following services in Digital Certificate Manager: Central, Data Queue, Remote Command, Sign on, and DDM / DRDA services.
-
The IBM i user ID must be authorized to perform operations on the intended IBM i objects.
-
If there is additional security software in place that restricts remote execution functionality, the IBM i user ID defined for the connector configuration must be granted permission to execute remote calls and access database, IFS, and DDM services.
Dependencies
-
Access to the IBM i server is required.
-
Access to the Confluent Kafka platform is necessary.
Compatibility Matrix
Application/Service | Version |
---|---|
Confluent Kafka | 6.0.0 or higher |
Confluent Control Center | 6.0.0 or higher |
IBM i / OS400 | V5R4 or higher |
Connector Operations
The IBM i connector is an operation-based connector. This implies that when you integrate the connector into your Kafka Connect cluster, you are required to configure a specific operation that the connector will carry out. The connector provides support for the following operations:
Operation | Version |
---|---|
Read Data Queue (Message Source) | The connector is capable of continuously monitoring for new messages that arrive at a designated data queue. |
Write to Data Queue | The connector can write messages to a data queue. |
Program Call | The connector is able to execute an IBM i program. |
Confluent Setup
-
Steps to setup confluent kafka standalone environment in local system
https://docs.confluent.io/5.5.0/quickstart/ce-quickstart.html
-
Steps to setup confluent kafka standalone environment in cloud
https://dzone.com/articles/installing-and-configuring-confluent-platform-kafk
Once the confluent kafka install follows the below steps for connector installation
Install the AS400 Connector
Connectors are packaged as Kafka Connect plugins. Kafka connect isolates each plugin so that the plugin libraries do not conflict with each other. Download and extract the ZIP file for your connector and then follow the below installation instructions.
-
In Standalone mode
-
kafka-connect-as400 connector can be downloaded directly from the confluent-hub using
confluent-hub install infoviewsystems/kafka-connect-as400:latest
-
Extract the content into the desired location (Preferred: /confluent/share/java or /confluent/share/confluent-hub-component) Generally it will install /confluent/share/confluent-hub-component location so no need to extract it manually.
-
Start confluent control center using command “confluent local services start”
- Control center should be up and running and can be verified with http://{HOST}:9021
- Source and Sink connectors are ready to configure now. And here are the sample configurations to be used.
-
-
Confluent Setup and connector installation through docker
-
please find a sample docker-compose.yml file. Note that depending on host OS and your specific configuration you may have to make changes, for example to specify the fully qualified path to license and truststore folder, and other changes as necessary.
-
Check once the connect service from the above file for
volumes:
/home/ubuntu/license/:/opt/
create license directory in local system (ex. mkdir license) and place the as400-license.lic file in the same directory.
while running docker-compose.yml file license file i.e as400-license.lic will copy from local to docker container path i.e /opt/.
-
Now execute below command
docker-compose up -d
It will download all images from docker hub and install
once downloading completed verify the status with below command is all services up and running
docker-compose ps -a
-
To verify the license is copied to /opt/, execute the below command to connect with kafka connect service with interactive mode
docker exec -it connect bash
execute below command to validate
cd /opt/
ls -l
find the screenshot for reference
-
To verify the infoviewsystems-as400-kafka-connect is install or not go through below steps
cd /usr/share/confluent-hub-components/
ls -l
find the screenshot for reference
-
Control center should be up and running and can be verified with http://{HOST}:9021
-
Source and Sink connectors are ready to configure now. And here are the sample configurations to be used.
-
License Management:
The IBM i connector requires a license file "as400-license.lic" from Infoview to enable access to specific IBM i system(s). Managing license in different ways by using different protocols such as S3, HTTP/HTTPS, FTP, FILE etc. and accessing it through these protocols in our application needs to configure in connector configuration. Available Protocols to load license file (HTTP,HTTPS, FTP,S3, FILE, CLASSPATH). Based on the protocol parameters needs to be configure.
Based on protocol type needs to configure below properties
Protocols | Parameters to configure | Mandatory | configuration keys for parameters |
---|---|---|---|
FILE | path filename |
required required |
as400.license.path license.fileName |
S3 | S3 bucket path filename S3 region Access key Secret key |
required required required required required |
s3.bucket license.fileName s3.region s3.accessKey s3.secretKey |
S3_IAM | S3 bucket path filename S3 region |
required required required |
s3.bucket license.fileName s3.region |
FTP | Host directory path filename username password |
required required required required required |
ftp.host ftp.dir.path license.fileName ftp.username ftp.password |
CLASSPATH | filename | required | license.fileName |
HTTP | HTTP Host HTTP Directory filename HTTP Username HTTP Password |
required required required required required |
http.url http.dir.path license.fileName http.username http.password |
HTTPS | HTTPS Host HTTPS Directory filename HTTPS Username HTTPS Password |
required required required required required |
https.url https.dir.path license.fileName https.username https.password |
BROKER | License Topic | required | license.licenseTopic |
DIRECT | License Text | required | license.licenseText |
Truststore Management:
The IBM i connector requires a truststore file "info400new.truststore" when the secure connection property is set to true. This enables access to specific IBM i system(s) over TLS connection. Managing the truststore file can be done in various ways using different protocols, including S3, HTTP/HTTPS, FTP, FILE, etc., and accessing it through these protocols requires configuration in the connector settings.
Available protocols for loading the truststore file include HTTP, HTTPS, FTP, S3, FILE, and CLASSPATH. The specific parameters to configure depend on the chosen protocol.
Based on protocol type needs to configure below properties
Protocols | Parameters to configure | Mandatory | configuration keys for parameters |
---|---|---|---|
FILE | path filename |
required required |
as400.tlsFile.path truststore.fileName |
S3 | S3 bucket path filename S3 region Access key Secret key |
required required required required required |
s3.bucket.tls truststore.fileName s3.region.tls s3.accessKey.tls s3.secretKey.tls |
S3_IAM | S3 bucket path filename S3 region |
required required required |
s3.bucket.tls truststore.fileName s3.region.tls |
FTP | Host directory path filename username password |
required required required required required |
ftp.host.tls ftp.dir.path.tls truststore.fileName ftp.username.tls ftp.password.tls |
CLASSPATH | filename | required | license.fileName |
HTTP | HTTP Host HTTP Directory filename HTTP Username HTTP Password |
required required required required required |
http.url.tls http.dir.path.tls truststore.fileName http.username.tls http.password.tls |
HTTPS | HTTPS Host HTTPS Directory filename HTTPS Username HTTPS Password |
required required required required required |
https.url.tls https.dir.path.tls truststore.fileName https.username.tls https.password.tls |
Please contact Infoview Systems Connector support team at (734) 293-2160 and (+91) 4042707110 or via email sales@infoviewsystems.com and marketing@infoviewsystems.com
AS400 Connection Configuration Properties
- Connection
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Name | Enter a unique label for the connector in your application. | Required | AS400SourceConnectorConnector_0 | name |
AS400 URL | AS400 system connection url. | Required | null | as400.url |
User Id | AS400 System user | Required | null | as400.userId |
PASSWORD | AS400 system connection password. | Required | null | as400.password |
License protocol | Please refere above mentioned license management section | Required | null | as400.license.protocol |
Truststore file protocol | Please refere above mentioned truststore management section | Optional | null | Truststore file protocol |
IASP | Logical partitions in the systems. | Optional | null | as400.iasp |
Library List | List of libraries, in addition to the library list associated with the user id. The libraries must be separated with comma and will be added to the top of the library list. | Optional | null | as400.libraries |
Secure Connection | Enable secure connection with AS400 over encrypted channel. | Optional | False | as400.secure.connection |
Socket Timeout | Socket Timeout, ms. Default value of -1 means the default JVM SO_TIMEOUT will be used | Optional | -1 | as400.socket.timeout |
Time unit to be used for Socket Timeout | Socket Timeout time unit | Optional | MILLISECONDS | as400.socket.timeunit |
Connection Retries | Number of times to retry establishing the connection internally before throwing an exception and passing back to Kafka connection Manager. | Optional | 3 | as400.connection.retry |
Reconnection Period | Time between internal reconnection retries in ms. | Optional | 60000 | as400.reconnection.period |
Time unit to be used for Reconnection Period | Reconnection period time unit. | Optional | MILLISECONDS | as400.reconnection.timeunit |
Connection Time to Live | Max time (Seconds) that the connection can be used. | Optional | 0 | as400.connection.live |
Reconnection Period time out | Time out to be used for connection time out to live | Optional | SECONDS |
- Connection (Optional)
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Operation Type | An Operation type to be done on AS400 FILE = 0, PRINT = 1,COMMAND = 2,DATAQUEUE = 3, DATABASE = 4, RECORDACCESS = 5, CENTRAL = 6, SIGNON = 7 | Optional | 2 | as400.operation.type |
CCSID | CCSID stands for "Coded Character Set Identifier." It is a numerical representation that defines a specific character encoding or character set used in IBM systems, including the IBM i (AS/400) platform. | Optional | 0 | as400.ccsid |
Pre Start Count Data Queue | - | Optional | 2 | as400.prestart.count.dq |
Pre Start Count Command | - | Optional | 2 | as400.prestart.count.command |
Cleanup Interval | - | Optional | 2 | as400.cleanup.interval |
Max Connection | Maximum connections allowed. | Optional | 5 | as400.max.connection |
Max Inactivity | Maximum time to inactive the session for connection. | Optional | 10 | as400.max.inactivity |
Max Lifetime | Maximum lifetime for connection. | Optional | 60000 | as400.max.lifetime |
Max Use Count | - | Optional | 10 | as400.max.usecount |
Max Use Time | - | Optional | 30000 | as400.max.usetime |
Pre-Test Connection | - | Optional | true | as400.pretest.connection |
Run Maintenance | - | Optional | true | as400.run.maintenance |
Thread Used | - | Optional | true | as400.thread.used |
Keep Alive | - | Optional | true | as400.keep.alive |
Login Timeout | - | Optional | 0 | as400.login.timeout |
Receive Buffer Size | - | Optional | 1000 | as400.receive.buffer.size |
Send Buffer Size | - | Optional | 1000 | as400.send.buffer.size |
So Linger | - | Optional | 0 | as400.so.linger |
So Timeout | - | Optional | 0 | as400.so.timeout |
TCP No Delay | - | Optional | true | as400.tcp.nodelay |
AS400 Source Connector Configuration Properties
Configure these connector properties.
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Data Queue | Read data queue name. | Required | null | as400.read.dataqueue.name |
Library | Read data queue library. | Required | null | as400.read.dataqueue.library |
Key | Must be specified for keyed data queues and blank for non-keyed data queues. For reading any message from data queue. | Optional | null | as400.read.dataqueue.key |
Key Search Type | Must be specified for keyed data queues. For reading any message from data queue, available search types are equal,not equal,greater than,less than,greater than or equal,less than or equal. | Optional | null | as400.read.dataqueue.key.search.type |
Keep messages in Queue | Ensure it is unchecked unless the intent is to leave the message in the queue after reading. | Optional | true | as400.source.keep.message |
Format File Name | Optional parameter allows treating data queue entry as an externally defined data structure. When defined, the connector will dynamically retrieve the record format from the specified IBM i file, and parse the received data queue entry into the map of field name / value pairs. The connector will perform the type conversion, supporting all types such as packed, date / time etc. | Optional | null | as400.source.format.name |
Format File Library | When format file is specified, the format file library can also be specified, otherwise the format file will be located based on the connection library list. | Optional | null | as400.source.file.library |
Number of Consumers | Number of consumers. | Optional | 4 | as400.source.consumer.numbers |
- Source Response
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Response Data Queue | Will update the response back to the data queue. | Optional | null | as400.write.dataqueue.name |
Response Data Queue Library | Response data queue library. | Optional | null | as400.write.dataqueue.library |
Response Data Queue Expression | Response data queue Expression. | Optional | null | as400.response.dataqueue.expression |
JsonConverter(Source)
Note: Here is a sample property for AS400 source connector with JsonConverter configuration.
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "/opt",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schemas.enable": "false"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.
Note: Here's how you might include these properties in your configuration:
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"as400.read.dataqueue.key": "your-key",
"as400.read.dataqueue.key.search.type":"greater than or equal",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schemas.enable": "false"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.
You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.
JsonSchemaConverter(Source)
Note: Here is a sample property for AS400 source connector with JsonSchemaConverter and schema registry configuration.
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "/opt",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.
Note: Here's how you might include these properties in your configuration:
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"as400.read.dataqueue.key": "your-key",
"as400.read.dataqueue.key.search.type":"greater than or equal",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.
You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.
AvroConverter(Source)
Note: Here is a sample property for AS400 source connector with AvroConverter configuration.
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "/opt",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
when reading data from a keyed data queue on the IBM i (AS/400) platform, you need to specify the search type and key using the as400.read.dataqueue.key and as400.read.dataqueue.key.search.type properties in your configuration.
Note: Here's how you might include these properties in your configuration:
{
"name": "AS400SourceConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.read.dataqueue.name": "your-data-queue-name",
"as400.read.dataqueue.library": "your-Library",
"as400.source.format.name": "format-file-name",
"as400.source.file.library": "your-format-file-library",
"source.kafka.topic": "Test",
"as400.read.dataqueue.key": "your-key",
"as400.read.dataqueue.key.search.type":"greater than or equal",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
In this example, as400.read.dataqueue.key=123 specifies that you are reading data from a keyed data queue based on key 123. Additionally, as400.read.dataqueue.key.search.type=equal indicates that you want to search for data where the key is equal to the specified value.
You can replace equal with other search types like greater_than, less_than, not_equal, greater_than_or_equal, and less_than_or_equal depending on your requirements.
Source Acknowledgement
Source connector is capable of sending response back to dataqueue and here is the configuration to send the response through program-call.
"source.ack.type": "Program",
"as400.program.name": "CDCPGMACKR",
"as400.program.library": "INFOCDCCOM",
"as400.program.libraryList": "INFOCDCDEM,INFOCDCCOM",
AS400 Data Queue Sink Connector Configuration Properties
Configure these connector properties.
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Data Queue | Write data queue name. | Required | null | as400.write.dataqueue.name |
Library | Write data queue library. | Required | null | as400.write.dataqueue.library |
Is Keyed DataQ | Must be specified for keyed data queues and blank for non-keyed data queues. For reading any message from data queue. | Optional | false | as400.write.dataqueue.key |
Format File Name | Optional parameter allows treating data queue entry as an externally defined data structure. When defined, the connector will dynamically retrieve the record format from the the specified IBM i file and parse the received data queue entry into the map of field name / value pairs. The connector will perform the type conversion, supporting all types such as packed, date / time etc. | Optional | null | as400.sink.format.name |
Format File Library | When format file is specified, the format file library can also be specified, otherwise the format file will be located based on the connection library list. | Optional | null | as400.sink.file.library |
DQ Entry Length | Max DQ Entry Length. When specified and greater than 0, the parameter value will be truncated to fit the max length. | Optional | 0 | as400.dq.entry.length |
DQ Key Length | Max DQ Key Length. When specified and greater than 0, the parameter value will be used (instead of dynamically retrieving it from DQ definitions on the server). | Optional | null | as400.dq.key.length |
JsonConverter(Sink)
Note: Here is a sample property for AS400 Data Queue Sink connector with JsonConverter and Dead letter Queue configuration.
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "json_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"value.converter.schemas.enable": "false"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.
Note: Here's an example of how you might include this property in your configuration:
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "json_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"as400.write.dataqueue.key":"true",
"value.converter.schemas.enable": "false"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
JsonSchemaConverter(Sink)
Note: Here is a sample property for AS400 Data Queue Sink connector with JsonSchemaConverter, schema registry and Dead letter Queue configuration.
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "jsonSchema-CDCDQ8_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.
Note: Here's an example of how you might include this property in your configuration:
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "jsonSchema-CDCDQ8_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"as400.write.dataqueue.key":"true",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
AvroConverter(Sink)
Note: Here is a sample property for AS400 Data Queue Sink connector with JsonConverter, schema registry and Dead letter Queue configuration.
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "avro_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
When writing a response to a keyed data queue on the IBM i (AS/400) platform, you would typically need to set the as400.write.dataqueue.key property to true in your configuration. This property informs the connector that it should write the response data to a keyed data queue.
Note: Here's an example of how you might include this property in your configuration:
{
"name": "AS400DataQueueSinkConnectorConnector_0",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400DataQueueSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "Test",
"transforms": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "avro_dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.write.dataqueue.name": "your-data-queue-name",
"as400.write.dataqueue.library": "your-library",
"as400.sink.format.name": "format-file-name",
"as400.sink.file.library": "format-file-library",
"as400.write.dataqueue.key":"true",
"value.converter.schema.registry.url": "http://localhost:8081"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
AS400 Program Call Sink Connector Configuration Properties
Configure these connector properties
Parameter | Description | Mandatory | Default Value | configuration keys for parameters |
---|---|---|---|---|
Program Name | AS400 program name | Required | null | as400.program.name |
Program Library | Program library name | Required | null | as400.program.library |
Program Parameters | List of definitions and value references of program parameters | Optional | null | as400.program.parameters |
Procedure Name | Name of the procedure | Optional | null | as400.procedure.name |
Procedure Returns Value | Indicator if the program procedure returns a value. | Optional | false | as400.procedure.returnsValue |
Threadsafe | Indicator if the program is thread safe | Optional | false | as400.threadsafe |
Sink Target Topic | Sink target topic is a topic to push program call output | Required | null | sink.kafka.topic |
Kafka Partition Key | Kafka partition key to push record to topic in the specific location | Required | null | sink.kafka.partition.key |
Note: Here is a sample property for AS400 Program Call Sink connector configuration
{
"name": "AS400ProgramCallSinkConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400ProgramCallSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "CRTORDER_INPUT",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.program.name": "program-name",
"as400.program.library": "KFKDEMOS",
"as400.program.libraryList": "comma-separated-list-of-libraries"
"as400.program.parameters": "{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0,
\"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\":
\"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\",
\"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\":
[] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\":
\"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\",
\"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\":
\"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ {
\"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\":
1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5,
\"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\",
\"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }",
"as400.threadsafe": "false",
"value.converter.schemas.enable": "false"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
Note: when calling Program (PGM object) and Service Program (SRVPGM object) on the IBM i (AS/400) platform. Here's how you can use the as400.procedure.name property to define procedures when calling these objects:
For PGM Object:
When calling a Program (*PGM object), you typically don't need to specify a procedure name, as Programs in this context are generally standalone units of code that are executed without calling a specific procedure within them.
For SRVPGM Object:
Service Programs (*SRVPGM objects) are designed to be reused by multiple programs, and they can contain multiple exported procedures that can be called individually. When calling a Service Program and you intend to use a specific procedure within it, the procedure name must match the exported name of the procedure you want to execute.
The as400.procedure.name property you mentioned can be used in the configuration of your connector to specify the procedure name when calling a Service Program.
Note: Here's an example of how you might use it in your connector configuration:
{
"name": "AS400ProgramCallSinkConnector",
"config": {
"connector.class": "com.infoviewsystems.kafka.connect.as400.core.AS400ProgramCallSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "CRTORDER_INPUT",
"as400.url": "your-ibm-i-host",
"as400.userId": "your-username",
"as400.password": "your-password",
"as400.secure.connection": "false",
"as400.license.protocol": "FILE",
"as400.license.path": "license-file-path",
"license.fileName": "as400-license.lic",
"as400.program.name": "program-name",
"as400.program.library": "KFKDEMOS",
"as400.program.libraryList": "comma-separated-list-of-libraries",
"as400.program.libraryList.mode": "ADD_FIRST",
"as400.program.parameters": "{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0,
\"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\":
\"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\",
\"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\":
[] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\":
\"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\",
\"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\":
\"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ {
\"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\":
1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5,
\"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\",
\"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }",
"as400.threadsafe": "false",
"value.converter.schemas.enable": "false",
"as400.procedure.name":"your-procedure-name"
/*Below are optional (Required if security enabled) */
// "security.protocol": "SASL_SSL",
// "sasl.mechanism": "PLAIN/SCRAM",
// "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"GV6LHRS7U\" password=\"uFbVuXMuKNrzUG8Nu0aSJPMOj\";"
}
}
IBMi program parameter definitions:
d main pr extpgm('CRTORDER')
d srcOrder 8p 0
d trgtOrder 8p 0
d ordAmt 10p 2
d ordSts 10a
d nbrLines 4p 0
d linesIn likeds(linesInFmt) dim(10)
d linesInfmt DS Dim(10) qualified
d itemNo 10a
d qty 5p 0
d price 10p 2
```
Example: Input Structure
```{
"sourceOrderId": 11934700,
"targetOrderId": 11934700,
"orderAmt": 1000.00,
"orderStatus": "Activated",
"noOfItems": 2,
"linesIn": [
{
"itemNo": "111",
"quantity": 10,
"price": 10.00
},
{
"itemNo": "222",
"quantity": 90,
"price": 10.00
}
]
}
as400.program.parameters : program parameters has to be transformed in below way
```{ "params": [ { "parameterName": "sourceOrderId", "sourceFieldName": "$['sourceOrderId']", "dataType": "PACKED", "length": 8, "decimalPositions": 0, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "targetOrderId", "sourceFieldName": "$['targetOrderId']", "dataType": "PACKED", "length": 8, "decimalPositions": 0, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "orderAmt", "sourceFieldName": "$['orderAmt']", "dataType": "PACKED", "length": 10, "decimalPositions": 2, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "orderStatus", "sourceFieldName": "$['orderStatus']", "dataType": "STRING", "length": 10, "decimalPositions": 0, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "numberOfLines", "sourceFieldName": "$['noOfItems']", "dataType": "PACKED", "length": 4, "decimalPositions": 0, "usage": "IN", "count": 1, "dataStructureElements": [] }, { "parameterName": "orderLines", "sourceFieldName": "$['linesIn']", "dataType": "STRUCTURE", "length": 0, "decimalPositions": 0, "usage": "INOUT", "count": 10, "dataStructureElements": [ { "parameterName": "itemNo", "sourceFieldName": "$['itemNo']", "dataType": "STRING", "length": 10, "decimalPositions": 0, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "quantity", "sourceFieldName": "$['quantity']", "dataType": "PACKED", "length": 5, "decimalPositions": 0, "usage": "INOUT", "count": 1, "dataStructureElements": [] }, { "parameterName": "price", "sourceFieldName": "$['price']", "dataType": "PACKED", "length": 10, "decimalPositions": 2, "usage": "INOUT", "count": 1, "dataStructureElements": [] } ] } ] }
**Note:** Inpute structure field names should be matched with respective sourceFieldName so that connector can map values accordingly.
Once have above structure,needs to be stringify it will be same as below:
{ \"params\": [ { \"parameterName\": \"sourceOrderId\", \"sourceFieldName\": \"$['sourceOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"targetOrderId\", \"sourceFieldName\": \"$['targetOrderId']\", \"dataType\": \"PACKED\", \"length\": 8, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderAmt\", \"sourceFieldName\": \"$['orderAmt']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderStatus\", \"sourceFieldName\": \"$['orderStatus']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"numberOfLines\", \"sourceFieldName\": \"$['noOfItems']\", \"dataType\": \"PACKED\", \"length\": 4, \"decimalPositions\": 0, \"usage\": \"IN\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"orderLines\", \"sourceFieldName\": \"$['linesIn']\", \"dataType\": \"STRUCTURE\", \"length\": 0, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 10, \"dataStructureElements\": [ { \"parameterName\": \"itemNo\", \"sourceFieldName\": \"$['itemNo']\", \"dataType\": \"STRING\", \"length\": 10, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"quantity\", \"sourceFieldName\": \"$['quantity']\", \"dataType\": \"PACKED\", \"length\": 5, \"decimalPositions\": 0, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] }, { \"parameterName\": \"price\", \"sourceFieldName\": \"$['price']\", \"dataType\": \"PACKED\", \"length\": 10, \"decimalPositions\": 2, \"usage\": \"INOUT\", \"count\": 1, \"dataStructureElements\": [] } ] } ] }
Now configuration for programCallSinkConnector is ready to deploy connector.
## TLS Configuration
| Parameter | Description |Mandatory |Default Value|
|---------------|-----------------------------------------------------------|-----------------------------------|-------------|
|Truststore|Truststore is used to store jks type of certificates from Certified Authorities (CA) that verify the certificate presented by the server in SSL connection|
![image](https://user-images.githubusercontent.com/46368616/133774025-d397bcd4-b88f-49a7-b06f-305b0dac9f5b.png)
## Truststore
| Parameter | Description |Mandatory |Default Value|configuration keys for parameters|
|---------------|-----------------------------------------------------------|-----------------------------------|-------------|---------------------------------|
|Truststore Filename| truststore file name|optional|null|truststore.fileName|
|Password|The password used to protect the trust store.|Optional|null|TLS.password|
|Insecure|If true, no certificate validations are performed, rendering connections vulnerable to attacks. Use at your own risk.|Optional|false|TLS.isInsecure|
|IsKeystoreConfigured|-|Optional|true|TLS.isKeystoreConfigure|
|IsTruststoreConfigured|-|Optional|true|TLS.isTruststoreConfigured|
## Schema Registry Configuration
Schema Registry must be configured for Schema Converters to avoid problems with registration updated Schemas after updating Format File
**Avro**
```properties
io.confluent.connect.avro.AvroConverter
JSON Schema
io.confluent.connect.json.JsonSchemaConverter
Schema Registry can be configured in 3 ways: 1. Through Confluent Control Center
Open Topic menu and choose topic where new messages publish. Then click on Schema field. You will be able to see already registered schema. Click on pull down menu ... and choose Compatibility settings.
Then choose compatibility level as NONE and save changes
These changes will apply for only edited Topic. If you have another topics where messages published with schema you also need to repeat these steps for them. If you don't want to repeat these steps for every new topic, use other options below.
- Through Confluent Platform properties file
Open installation folder for Confluent and go to /etc/schema-registry/ folder
Edit schema-registry.properties file with following command
nano schema-registry.properties
Add new property in the end of file
schema.compatibility.level=none
Save changes and restart Confluent Platform if needed to apply changes Compatibility will define as NONE as default for new schemas
- Through Docker Compose File
Add new property for schema-registry container in environment section
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: none
Then restart docker container with following command if needed to apply changes
docker-compose up -d
Compatibility will define as NONE as default for new schemas
Log Exceptions
Connector is able to log all critical exceptions to a specific topic (as400-error-log) upon making log.exceptions=true. Default value of log.exception is false which means connector will not log any exception to topic
Contact Us
Contact us for connector pricing info, trial license, or support questions.