栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spark 之 OnHeapColumnVector

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spark 之 OnHeapColumnVector

allocateColumns
 
  public static OnHeapColumnVector[] allocateColumns(int capacity, StructType schema) {
    return allocateColumns(capacity, schema.fields());
  }
 
  public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
    OnHeapColumnVector[] vectors = new OnHeapColumnVector[fields.length];
    for (int i = 0; i < fields.length; i++) {
      vectors[i] = new onHeapColumnVector(capacity, fields[i].dataType());
    }
    
onHeapColumnVector 构造函数
public onHeapColumnVector(int capacity, DataType type) {
    super(capacity, type);

    reserveInternal(capacity);
    reset();
  }

reserveInternal 主要是为了对以下 数组预留空间

// This is faster than a boolean array and we optimize this over memory footprint.
  private byte[] nulls;

  // Array for each type. only 1 is populated for any type.
  private byte[] byteData;
  private short[] shortData;
  private int[] intData;
  private long[] longData;
  private float[] floatData;
  private double[] doubleData;

  // only set if type is Array or Map.
  private int[] arrayLengths;
  private int[] arrayOffsets;
  // Spilt this function out since it is the slow path.
  @Override
  protected void reserveInternal(int newCapacity) {
    if (isArray() || type instanceof MapType) {
      int[] newLengths = new int[newCapacity];
      int[] newOffsets = new int[newCapacity];
      if (this.arrayLengths != null) {
        System.arraycopy(this.arrayLengths, 0, newLengths, 0, capacity);
        System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, capacity);
      }
      arrayLengths = newLengths;
      arrayOffsets = newOffsets;
    } else if (type instanceof BooleanType) {
      if (byteData == null || byteData.length < newCapacity) {
        byte[] newData = new byte[newCapacity];
        if (byteData != null) System.arraycopy(byteData, 0, newData, 0, capacity);
        byteData = newData;
      }
    } else if (type instanceof ByteType) {
      if (byteData == null || byteData.length < newCapacity) {
        byte[] newData = new byte[newCapacity];
        if (byteData != null) System.arraycopy(byteData, 0, newData, 0, capacity);
        byteData = newData;
      }
    } else if (type instanceof ShortType) {
      if (shortData == null || shortData.length < newCapacity) {
        short[] newData = new short[newCapacity];
        if (shortData != null) System.arraycopy(shortData, 0, newData, 0, capacity);
        shortData = newData;
      }
    } else if (type instanceof IntegerType || type instanceof DateType ||
      DecimalType.is32BitDecimalType(type)) {
      if (intData == null || intData.length < newCapacity) {
        int[] newData = new int[newCapacity];
        if (intData != null) System.arraycopy(intData, 0, newData, 0, capacity);
        intData = newData;
      }
    } else if (type instanceof LongType || type instanceof TimestampType ||
        DecimalType.is64BitDecimalType(type)) {
      if (longData == null || longData.length < newCapacity) {
        long[] newData = new long[newCapacity];
        if (longData != null) System.arraycopy(longData, 0, newData, 0, capacity);
        longData = newData;
      }
    } else if (type instanceof FloatType) {
      if (floatData == null || floatData.length < newCapacity) {
        float[] newData = new float[newCapacity];
        if (floatData != null) System.arraycopy(floatData, 0, newData, 0, capacity);
        floatData = newData;
      }
    } else if (type instanceof DoubleType) {
      if (doubleData == null || doubleData.length < newCapacity) {
        double[] newData = new double[newCapacity];
        if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, capacity);
        doubleData = newData;
      }
    } else if (childColumns != null) {
      // Nothing to store.
    } else {
      throw new RuntimeException("Unhandled " + type);
    }

    byte[] newNulls = new byte[newCapacity];
    if (nulls != null) System.arraycopy(nulls, 0, newNulls, 0, capacity);
    nulls = newNulls;

    capacity = newCapacity;
  }

调用父类WritableColumnVector

  public void reset() {
    if (isConstant) return;

    if (childColumns != null) {
      for (ColumnVector c: childColumns) {
        ((WritableColumnVector) c).reset();
      }
    }
    elementsAppended = 0;
    if (numNulls > 0) {
      putNotNulls(0, capacity);
      numNulls = 0;
    }
  }

默认nulls数组默认是全0,代表都不为空。所以put数值的时候,可以不用使用putNotNull.

@Override
  public void putNotNulls(int rowId, int count) {
    if (!hasNull()) return;
    for (int i = 0; i < count; ++i) {
      nulls[rowId + i] = (byte)0;
    }
  }
super(capacity, type) 调用父类 WritableColumnVector 构造函数,主要是为了 初始化childcolumn
  
  protected WritableColumnVector(int capacity, DataType type) {
    super(type);
    this.capacity = capacity;

    if (isArray()) {
      DataType childType;
      int childCapacity = capacity;
      if (type instanceof ArrayType) {
        childType = ((ArrayType)type).elementType();
      } else {
        childType = DataTypes.ByteType;
        childCapacity *= DEFAULT_ARRAY_LENGTH;
      }
      this.childColumns = new WritableColumnVector[1];
      this.childColumns[0] = reserveNewColumn(childCapacity, childType);
    } else if (type instanceof StructType) {
      StructType st = (StructType)type;
      this.childColumns = new WritableColumnVector[st.fields().length];
      for (int i = 0; i < childColumns.length; ++i) {
        this.childColumns[i] = reserveNewColumn(capacity, st.fields()[i].dataType());
      }
    } else if (type instanceof MapType) {
      MapType mapType = (MapType) type;
      this.childColumns = new WritableColumnVector[2];
      this.childColumns[0] = reserveNewColumn(capacity, mapType.keyType());
      this.childColumns[1] = reserveNewColumn(capacity, mapType.valueType());
    } else if (type instanceof CalendarIntervalType) {
      // Three columns. Months as int. Days as Int. Microseconds as Long.
      this.childColumns = new WritableColumnVector[3];
      this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType);
      this.childColumns[1] = reserveNewColumn(capacity, DataTypes.IntegerType);
      this.childColumns[2] = reserveNewColumn(capacity, DataTypes.LongType);
    } else {
      this.childColumns = null;
    }
  }
对于 complex type, 调用子类reserveNewColumn接口
 @Override
  protected onHeapColumnVector reserveNewColumn(int capacity, DataType type) {
    return new onHeapColumnVector(capacity, type);
  }

然后跳回子类 onHeapColumnVector 构造函数,实现了递归。

Primitive ColumnVector
testVectors("int", 10, IntegerType) { testVector =>
    (0 until 10).foreach { i =>
      testVector.appendInt(i)
    }

    val array = new ColumnarArray(testVector, 0, 10)
    val arrayCopy = array.copy()

    (0 until 10).foreach { i =>
      assert(array.get(i, IntegerType) === i)
      assert(arrayCopy.get(i, IntegerType) === i)
    }
  }
ArrayType ColumnVector 使用

spark/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala

 val arrayType: ArrayType = ArrayType(IntegerType, containsNull = true)
  testVectors("array", 10, arrayType) { testVector =>

    val data = testVector.arrayData()
    var i = 0
    while (i < 6) {
      data.putInt(i, i)
      i += 1
    }

    // Populate it with arrays [0], [1, 2], [], [3, 4, 5]
    testVector.putArray(0, 0, 1)
    testVector.putArray(1, 1, 2)
    testVector.putArray(2, 3, 0)
    testVector.putArray(3, 3, 3)

    assert(testVector.getArray(0).toIntArray() === Array(0))
    assert(testVector.getArray(1).toIntArray() === Array(1, 2))
    assert(testVector.getArray(2).toIntArray() === Array.empty[Int])
    assert(testVector.getArray(3).toIntArray() === Array(3, 4, 5))
  }
MapType ColumnVector 使用
test("Int Map") {
    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
      val column = allocate(10, new MapType(IntegerType, IntegerType, false), memMode)
      (0 to 1).foreach { colIndex =>
        val data = column.getChild(colIndex)
        (0 to 5).foreach {i =>
          data.putInt(i, i * (colIndex + 1))
        }
      }

      // Populate it with maps [0->0], [1->2, 2->4], null, [], [3->6, 4->8, 5->10]
      column.putArray(0, 0, 1)
      column.putArray(1, 1, 2)
      column.putNull(2)
      assert(column.getMap(2) == null)
      column.putArray(3, 3, 0)
      column.putArray(4, 3, 3)

      assert(column.getMap(0).numElements == 1)
      assert(column.getMap(1).numElements == 2)
      assert(column.isNullAt(2))
      assert(column.getMap(3).numElements == 0)
      assert(column.getMap(4).numElements == 3)
StringType ColumnVector 使用
  testVectors("string", 10, StringType) { testVector =>
    (0 until 10).map { i =>
      val utf8 = s"str$i".getBytes("utf8")
      testVector.appendByteArray(utf8, 0, utf8.length)
    }

    val array = new ColumnarArray(testVector, 0, 10)
    val arrayCopy = array.copy()

    (0 until 10).foreach { i =>
      assert(array.get(i, StringType) === UTF8String.fromString(s"str$i"))
      assert(arrayCopy.get(i, StringType) === UTF8String.fromString(s"str$i"))
    }
  }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781591.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号