File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in
lambda: self.create_worker().do_instruction(request), request)
File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
element.data)
File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "py_meteor/fn_execution/beam/beam_operations_fast.pyx", line 158, in py_meteor.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "py_meteor/fn_execution/beam/beam_operations_fast.pyx", line 174, in py_meteor.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "py_meteor/fn_execution/beam/beam_operations_fast.pyx", line 92, in py_meteor.fn_execution.beam.beam_operations_fast.NetworkOutputProcessor.process_outputs
File "py_meteor/fn_execution/beam/beam_coder_impl_fast.pyx", line 101, in py_meteor.fn_execution.beam.beam_coder_impl_fast._MeteorLengthPrefixCoderBeamWrapper.encode_to_stream
File "py_meteor/fn_execution/coder_impl_fast.pyx", line 294, in py_meteor.fn_execution.coder_impl_fast.ValueCoderImpl.encode_to_stream
File "py_meteor/fn_execution/coder_impl_fast.pyx", line 339, in py_meteor.fn_execution.coder_impl_fast.FlattenRowCoderImpl.encode_to_stream
File "py_meteor/fn_execution/coder_impl_fast.pyx", line 588, in py_meteor.fn_execution.coder_impl_fast.CharCoderImpl.encode_to_stream
AttributeError: 'float' object has no attribute 'encode'
python返回的结果应该是string类型结果返回的是float类型导致的。
代码在coder_impl.py, CharCoderImpl
class CharCoderImpl(StreamCoderImpl):
def encode_to_stream(self, value, out_stream, nested):
#这里调用了encode方法
bytes_value = value.encode("utf-8")
out_stream.write_bigendian_int32(len(bytes_value))
out_stream.write(bytes_value, False)
def decode_from_stream(self, in_stream, nested):
size = in_stream.read_bigendian_int32()
return in_stream.read(size).decode("utf-8")
2. Missing 1 required positional argument
报这个错有2种可能:
- 对象没有实例化直接调用了实例方法调用实例方法的参数不正确,比如需要一个参数,但是一个参数没传
导致这个错误的原因是生成物理计划的时候给python UDF传的input不正确。
3. The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type “Python” was missing or set to 0 for the config option “taskmanager.memory.managed.consumer-weights”.0.0Caused by: java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:233) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:131) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:110) at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:100) at org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:62) at org.apache.flink.table.runtime.operators.python.PythonExternalPredictFunctionOperator.open(PythonExternalPredictFunctionOperator.java:76) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748)
Flink新版本有变动,需要在使用了Python UDF的Transformation上调用
ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON)4. org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan cannot be cast to org.apache.calcite.plan.volcano.RelSubset
物理节点StreamPhysicalExternalModelPredict的copy方法应该使用方法里inputs,从里面获取输入,因为字段名太像了写成了input.
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new StreamPhysicalExternalModelPredict(cluster, traitSet, inputSchema,
inputs.get(0), scan, outputRowType, aiFunctionName, aiModelName)
}



