栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink实现外部模型预测遇到的问题

Flink实现外部模型预测遇到的问题

1. ‘float’ object has no attribute ‘encode’
  File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()
  File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in 
    lambda: self.create_worker().do_instruction(request), request)
  File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
    element.data)
  File "/opt/anaconda3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "py_meteor/fn_execution/beam/beam_operations_fast.pyx", line 158, in py_meteor.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "py_meteor/fn_execution/beam/beam_operations_fast.pyx", line 174, in py_meteor.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "py_meteor/fn_execution/beam/beam_operations_fast.pyx", line 92, in py_meteor.fn_execution.beam.beam_operations_fast.NetworkOutputProcessor.process_outputs
  File "py_meteor/fn_execution/beam/beam_coder_impl_fast.pyx", line 101, in py_meteor.fn_execution.beam.beam_coder_impl_fast._MeteorLengthPrefixCoderBeamWrapper.encode_to_stream
  File "py_meteor/fn_execution/coder_impl_fast.pyx", line 294, in py_meteor.fn_execution.coder_impl_fast.ValueCoderImpl.encode_to_stream
  File "py_meteor/fn_execution/coder_impl_fast.pyx", line 339, in py_meteor.fn_execution.coder_impl_fast.FlattenRowCoderImpl.encode_to_stream
  File "py_meteor/fn_execution/coder_impl_fast.pyx", line 588, in py_meteor.fn_execution.coder_impl_fast.CharCoderImpl.encode_to_stream
AttributeError: 'float' object has no attribute 'encode'

python返回的结果应该是string类型结果返回的是float类型导致的。

代码在coder_impl.py, CharCoderImpl

class CharCoderImpl(StreamCoderImpl):

    def encode_to_stream(self, value, out_stream, nested):
        #这里调用了encode方法
        bytes_value = value.encode("utf-8")
        out_stream.write_bigendian_int32(len(bytes_value))
        out_stream.write(bytes_value, False)

    def decode_from_stream(self, in_stream, nested):
        size = in_stream.read_bigendian_int32()
        return in_stream.read(size).decode("utf-8")
2. Missing 1 required positional argument

报这个错有2种可能:

    对象没有实例化直接调用了实例方法调用实例方法的参数不正确,比如需要一个参数,但是一个参数没传

导致这个错误的原因是生成物理计划的时候给python UDF传的input不正确。

3. The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type “Python” was missing or set to 0 for the config option “taskmanager.memory.managed.consumer-weights”.0.0
Caused by: java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:233)
	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:131)
	at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:110)
	at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:100)
	at org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:62)
	at org.apache.flink.table.runtime.operators.python.PythonExternalPredictFunctionOperator.open(PythonExternalPredictFunctionOperator.java:76)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:748)

Flink新版本有变动,需要在使用了Python UDF的Transformation上调用

ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON)
4. org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan cannot be cast to org.apache.calcite.plan.volcano.RelSubset

物理节点StreamPhysicalExternalModelPredict的copy方法应该使用方法里inputs,从里面获取输入,因为字段名太像了写成了input.

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
   new StreamPhysicalExternalModelPredict(cluster, traitSet, inputSchema,
     inputs.get(0), scan, outputRowType, aiFunctionName, aiModelName)
 }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742745.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号