I am trying to run a pyhon job using apache flink 1.15.2 as per the repo https://github.com/aws-samples/pyflink-getting-started.git
The repo lists 4 steps I am stuck in first step which is 1) Local development using Pyflink
I have done all the pre-requisites , like installing conda virtual env and installing the required flink version that is apache flink 1.15.2
Following the steps of the https://github.com/aws-samples/pyflink-getting-started/tree/main/getting-started
I have also download and saved the .jar file (kinesis connector in subdir lib of current directory)
- There is an input stream generator code: stock.py that runs fine
import datetime
import json
import random
import boto3
STREAM_NAME = "ExampleInputStream"
STREAM_REGION = "eu-west-2"
def get_data():
return {
'event_time': datetime.datetime.now().isoformat(),
'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'price': round(random.random() * 100, 2) }
def generate(stream_name, kinesis_client):
while True:
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey")
if __name__ == '__main__':
generate(STREAM_NAME, boto3.client('kinesis', region_name=STREAM_REGION))
- The issue is while running the Output stream code : getting-started.py
# -*- coding: utf-8 -*-
"""
getting-started.py
~~~~~~~~~~~~~~~~~~~
This module:
1. Creates a table environment
2. Creates a source table from a Kinesis Data Stream
3. Creates a sink table writing to a Kinesis Data Stream
4. Inserts the source table data into the sink table
"""
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json
# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
statement_set = table_env.create_statement_set()
# print(os.environ.get("IS_LOCAL"))
APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # on kda
is_local = (
True if os.environ.get("IS_LOCAL") else False
) # set this env var in your local environment
if is_local:
# only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # local
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
"file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar",
# "file:///" + "C:/lib/flink-sql-connector-kinesis-1.15.2.jar",
)
def get_application_properties():
if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
contents = file.read()
properties = json.loads(contents)
return properties
else:
print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))
def property_map(props, property_group_id):
for prop in props:
if prop["PropertyGroupId"] == property_group_id:
return prop["PropertyMap"]
def create_source_table(table_name, stream_name, region, stream_initpos):
return """ CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',
'scan.stream.initpos' = '{3}',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
) """.format(
table_name, stream_name, region, stream_initpos
)
def create_sink_table(table_name, stream_name, region, stream_initpos):
return """ CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',
'sink.partitioner-field-delimiter' = ';',
'sink.batch.max-size' = '100',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
) """.format(
table_name, stream_name, region
)
def create_print_table(table_name, stream_name, region, stream_initpos):
return """ CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'print'
) """.format(
table_name, stream_name, region, stream_initpos
)
def main():
# Application Property Keys
input_property_group_key = "consumer.config.0"
producer_property_group_key = "producer.config.0"
input_stream_key = "input.stream.name"
input_region_key = "aws.region"
input_starting_position_key = "flink.stream.initpos"
output_stream_key = "output.stream.name"
output_region_key = "aws.region"
# tables
input_table_name = "input_table"
output_table_name = "output_table"
# get application properties
props = get_application_properties()
input_property_map = property_map(props, input_property_group_key)
output_property_map = property_map(props, producer_property_group_key)
input_stream = input_property_map[input_stream_key]
input_region = input_property_map[input_region_key]
stream_initpos = input_property_map[input_starting_position_key]
output_stream = output_property_map[output_stream_key]
output_region = output_property_map[output_region_key]
# 2. Creates a source table from a Kinesis Data Stream
table_env.execute_sql(
create_source_table(input_table_name, input_stream, input_region, stream_initpos)
)
# 3. Creates a sink table writing to a Kinesis Data Stream
table_env.execute_sql(
create_print_table(output_table_name, output_stream, output_region, stream_initpos)
)
# 4. Inserts the source table data into the sink table
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
.format(output_table_name, input_table_name))
if is_local:
table_result.wait()
else:
# get job status through TableResult
print(table_result.get_job_client().get_job_status())
if __name__ == "__main__":
main()
I run into the error Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier ‘kinesis’ that implements ‘org.apache.flink.table.f
actories.DynamicTableFactory’ in the classpath.
see full strack trace
File ".getting-started.py", line 181, in <module>
main()
File ".getting-started.py", line 170, in main
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
File "C:Usersa856434AppDataLocalanaconda3envsmy-aws-flink-envlibsite-packagespyflinktabletable_environment.py", line 828, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
File "C:Usersa856434AppDataLocalanaconda3envsmy-aws-flink-envlibsite-packagespy4jjava_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "C:Usersa856434AppDataLocalanaconda3envsmy-aws-flink-envlibsite-packagespyflinkutilexceptions.py", line 146, in deco
return f(*a, **kw)
File "C:Usersa856434AppDataLocalanaconda3envsmy-aws-flink-envlibsite-packagespy4jprotocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o1.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.input_table'.
Table options are:
'aws.region'='eu-west-2'
'connector'='kinesis'
'format'='json'
'json.timestamp-format.standard'='ISO-8601'
'scan.stream.initpos'='LATEST'
'stream'='ExampleInputStream '
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:159)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:1
97)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:353)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:763)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:322)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kinesis'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:728)
at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:702)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:155)
... 34 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kinesis' that implements 'org.apache.flink.table.f
actories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:538)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:724)
... 36 more
Here is the application.json file
[
{
"PropertyGroupId": "kinesis.analytics.flink.run.options",
"PropertyMap": {
"python": "GettingStarted/getting-started.py",
"jarfile": "GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar"
}
},
{
"PropertyGroupId": "consumer.config.0",
"PropertyMap": {
"input.stream.name": "ExampleInputStream ",
"flink.stream.initpos": "LATEST",
"aws.region": "eu-west-2"
}
},
{
"PropertyGroupId": "producer.config.0",
"PropertyMap": {
"output.stream.name": "ExampleOutputStream",
"shard.count": "1",
"aws.region": "eu-west-2"
}
}
]
I was expecting the output stream to show some results like the image output stream
I rechecked the path of the jar file it exists in the path specified. I even tried testing using python sample program to see
a) if the dir exists
b) if the .jar file is accessible in that dir
dir_structure_files
But still running into the connector issue. I rechecked the pyflink documentation for the error and how .jar dependencies need to be specified, which is exactly how it is specified in the program.
https://pyflink.readthedocs.io/en/main/faq.html#connector-issues
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/dependency_management/#jar-dependencies
I am not using a fat jar as it is just one jar file and it should work as per the steps in repo.
Any help would be much appreciated . As to why I am using version 1.15.2 is because eventually we would want to write beam applications (currently not supported beyond 1.15.2)