数据库教程:JVM上高性能数据格式库包Apache Arrow入门和架构详解(Gkatziouras)

apache arrow是是各种大数据工具(包括bigquery)使用的一种流行格式,它是平面和分层数据的存储格式。它是一种加快应用程序内存密集型。数据处理和数据科学领域中的常用库: apache a

apache arrow是是各种大数据工具(包括bigquery)使用的一种流行格式,它是平面和分层数据的存储格式。它是一种加快应用程序内存密集型。

数据处理和数据科学领域中的常用库: apache arrow 。诸如apache parquet,apache spark,pandas之类的开放源代码项目以及许多商业或封闭源代码服务都使用arrow。它提供以下功能:

  • 内存计算
  • 标准化的柱状存储格式
  • 一个ipc和rpc框架,分别用于进程和节点之间的数据交换

让我们看一看在arrow出现之前事物是如何工作的:

JVM上高性能数据格式库包Apache Arrow入门和架构详解(Gkatziouras)

我们可以看到,为了使spark从parquet文件中读取数据,我们需要以parquet格式读取和反序列化数据。这要求我们通过将数据加载到内存中来制作数据的完整副本。首先,我们将数据读入内存缓冲区,然后使用parquet的转换方法将数据(例如字符串或数字)转换为我们的编程语言的表示形式。这是必需的,因为parquet表示的数字与python编程语言表示的数字不同。

由于许多原因,这对于性能来说是一个很大的问题:

  • 我们正在复制数据并在其上运行转换步骤。数据的格式不同,我们需要对所有数据进行读取和转换,然后再对数据进行任何计算。
  • 我们正在加载的数据必须放入内存中。您只有8gb的ram,数据是10gb吗?你真倒霉!

现在,让我们看一下apache arrow如何改进这一点:

JVM上高性能数据格式库包Apache Arrow入门和架构详解(Gkatziouras)

arrow无需复制和转换数据,而是了解如何直接读取和操作数据。为此,arrow社区定义了一种新的文件格式以及直接对序列化数据起作用的操作。可以直接从磁盘读取此数据格式,而无需将其加载到内存中并转换/反序列化数据。当然,部分数据仍将被加载到ram中,但您的数据不必放入内存中。arrow使用其文件的内存映射功能,仅在必要和可能的情况下将尽可能多的数据加载到内存中。

apache arrow支持以下语言:

  • c++
  • c#
  • go
  • java
  • javascript
  • rust
  • python (through the c++ library)
  • ruby (through the c++ library)
  • r (through the c++ library)
  • matlab (through the c++ library).

arrow特点

arrow首先是提供用于内存计算的列式数据结构的库,可以将任何数据解压缩并解码为arrow柱状数据结构,以便随后可以对解码后的数据进行内存内分析。arrow列格式具有一些不错的属性:随机访问为o(1),每个值单元格在内存中的前一个和后一个相邻,因此进行迭代非常有效。

apache arrow定义了一种二进制“序列化”协议,用于安排arrow列数组的集合(称为“记录批处理”),该数组可用于消息传递和进程间通信。您可以将协议放在任何地方,包括磁盘上,以后可以对其进行内存映射或读入内存并发送到其他地方。

arrow协议的设计目的是使您可以“映射”一个arrow数据块而不进行任何反序列化,因此对磁盘上的arrow协议数据执行分析可以使用内存映射并有效地支付零成本。该协议用于很多事情,例如spark sql和python之间的流数据,用于针对spark sql数据块运行pandas函数,这些被称为“ pandas udfs”。

arrow是为内存而设计的(但是您可以将其放在磁盘上,然后再进行内存映射)。它们旨在相互兼容,并在应用程序中一起使用,而其竞争对手apache parquet文件是为磁盘存储而设计的。

优点:apache arrow为平面和分层数据定义了一种独立于语言的列式存储格式,该格式组织为在cpu和gpu等现代硬件上进行高效的分析操作而组织。arrow存储器格式还支持零拷贝读取,以实现闪电般的数据访问,而无需序列化开销。

java的apache arrow

导入库:

  <dependency>      <groupid>org.apache.arrow</groupid>      <artifactid>arrow-memory-netty</artifactid>      <version>${arrow.version}</version>  </dependency>  <dependency>      <groupid>org.apache.arrow</groupid>      <artifactid>arrow-vector</artifactid>      <version>${arrow.version}</version>  </dependency>

在开始之前,必须了解对于arrow的读/写操作,使用了字节缓冲区。诸如读取和写入之类的操作是字节的连续交换。为了提高效率,arrow附带了一个缓冲区分配器,该缓冲区分配器可以具有一定的大小,也可以具有自动扩展功能。支持分配管理的库是arrow-memory-netty和arrow-memory-unsafe。我们这里使用netty。

用arrow存储数据需要一个模式,模式可以通过编程定义:

  package com.gkatzioura.arrow;    import java.io.ioexception;    import java.util.list;    import org.apache.arrow.vector.types.pojo.arrowtype;    import org.apache.arrow.vector.types.pojo.field;    import org.apache.arrow.vector.types.pojo.fieldtype;    import org.apache.arrow.vector.types.pojo.schema;    public class schemafactory {    public static schema default_schema = createdefault();    public static schema createdefault() {    var strfield = new field("col1", fieldtype.nullable(new arrowtype.utf8()), null);    var intfield = new field("col2", fieldtype.nullable(new arrowtype.int(32, true)), null);    return new schema(list.of(strfield, intfield));    }    public static schema schemawithchildren() {    var amount = new field("amount", fieldtype.nullable(new arrowtype.decimal(19,4,128)), null);    var currency = new field("currency",fieldtype.nullable(new arrowtype.utf8()), null);    var itemfield = new field("item", fieldtype.nullable(new arrowtype.utf8()), list.of(amount,currency));    return new schema(list.of(itemfield));    }    public static schema fromjson(string jsonstring) {    try {    return schema.fromjson(jsonstring);    } catch (ioexception e) {    throw new arrowexampleexception(e);    }    }    }

他们也有一个可解析的json表示形式:

  {    "fields" : [ {      "name" : "col1",      "nullable" : true,      "type" : {        "name" : "utf8"      },      "children" : [ ]    }, {      "name" : "col2",      "nullable" : true,      "type" : {        "name" : "int",        "bitwidth" : 32,        "issigned" : true      },      "children" : [ ]    } ]  }

另外,就像avro一样,您可以在字段上设计复杂的架构和嵌入式值:

  public static schema schemawithchildren() {      var amount = new field("amount", fieldtype.nullable(new arrowtype.decimal(19,4,128)), null);      var currency = new field("currency",fieldtype.nullable(new arrowtype.utf8()), null);      var itemfield = new field("item", fieldtype.nullable(new arrowtype.utf8()), list.of(amount,currency));         return new schema(list.of(itemfield));  }  

基于上面的的schema,我们将为我们的类创建一个dto:

  package com.gkatzioura.arrow;     import lombok.builder;  import lombok.data;     @data  @builder  public class defaultarrowentry {         private string col1;      private integer col2;     }  

我们的目标是将这些java对象转换为arrow字节流。

1. 使用分配器创建 directbytebuffer

这些缓冲区是 的 。您确实需要释放所使用的内存,但是对于库用户而言,这是通过在分配器上执行 close() 操作来完成的。在我们的例子中,我们的类将实现 closeable 接口,该接口将执行分配器关闭操作。

通过使用流api,数据将被流传输到使用arrow格式提交的outputstream:

  package com.gkatzioura.arrow;     import java.io.closeable;  import java.io.ioexception;  import java.nio.channels.writablebytechannel;  import java.util.list;     import org.apache.arrow.memory.rootallocator;  import org.apache.arrow.vector.intvector;  import org.apache.arrow.vector.varcharvector;  import org.apache.arrow.vector.vectorschemaroot;  import org.apache.arrow.vector.dictionary.dictionaryprovider;  import org.apache.arrow.vector.ipc.arrowstreamwriter;  import org.apache.arrow.vector.util.text;     import static com.gkatzioura.arrow.schemafactory.default_schema;     public class defaultentrieswriter implements closeable {         private final rootallocator rootallocator;      private final vectorschemaroot vectorschemaroot;//向量分配器创建:         public defaultentrieswriter() {          rootallocator = new rootallocator();          vectorschemaroot = vectorschemaroot.create(default_schema, rootallocator);      }         public void write(list<defaultarrowentry> defaultarrowentries, int batchsize, writablebytechannel out) {          if (batchsize <= 0) {              batchsize = defaultarrowentries.size();          }             dictionaryprovider.mapdictionaryprovider dictprovider = new dictionaryprovider.mapdictionaryprovider();          try(arrowstreamwriter writer = new arrowstreamwriter(vectorschemaroot, dictprovider, out)) {              writer.start();                 varcharvector childvector1 = (varcharvector) vectorschemaroot.getvector(0);              intvector childvector2 = (intvector) vectorschemaroot.getvector(1);              childvector1.reset();              childvector2.reset();                 boolean exactbatches = defaultarrowentries.size()%batchsize == 0;              int batchcounter = 0;                 for(int i=0; i < defaultarrowentries.size(); i++) {                  childvector1.setsafe(batchcounter, new text(defaultarrowentries.get(i).getcol1()));                  childvector2.setsafe(batchcounter, defaultarrowentries.get(i).getcol2());                     batchcounter++;                     if(batchcounter == batchsize) {                      vectorschemaroot.setrowcount(batchsize);                      writer.writebatch();                      batchcounter = 0;                  }              }                 if(!exactbatches) {                  vectorschemaroot.setrowcount(batchcounter);                  writer.writebatch();              }                 writer.end();          } catch (ioexception e) {              throw new arrowexampleexception(e);          }      }         @override      public void close() throws ioexception {          vectorschemaroot.close();          rootallocator.close();      }     }

为了在arrow上显示批处理的支持,已在函数中实现了简单的批处理算法。对于我们的示例,只需考虑将数据分批写入。

让我们深入了解上面代码功能:

向量分配器创建:

  public defaultentriestobytesconverter() {      rootallocator = new rootallocator();      vectorschemaroot = vectorschemaroot.create(default_schema, rootallocator);  }  

然后在写入流时,实现并启动了arrow流编写器

  arrowstreamwriter writer = new arrowstreamwriter(vectorschemaroot, dictprovider, channels.newchannel(out));  writer.start();  

我们将数据填充向量,然后还重置它们,但让预分配的缓冲区 存在 :

  varcharvector childvector1 = (varcharvector) vectorschemaroot.getvector(0);  intvector childvector2 = (intvector) vectorschemaroot.getvector(1);  childvector1.reset();  childvector2.reset();  

写入数据时,我们使用 setsafe 操作。如果需要分配更多的缓冲区,应采用这种方式。对于此示例,此操作在每次写入时都完成,但是在考虑了所需的操作和缓冲区大小后可以避免:

  childvector1.setsafe(i, new text(defaultarrowentries.get(i).getcol1()));  childvector2.setsafe(i, defaultarrowentries.get(i).getcol2());  

然后,将批处理写入流中:

  vectorschemaroot.setrowcount(batchsize);  writer.writebatch();  

最后但并非最不重要的一点是,我们关闭了writer:

  @override  public void close() throws ioexception {      vectorschemaroot.close();      rootallocator.close();  }  

以上就是jvm上高性能数据格式库包apache arrow入门和架构详解(gkatziouras)的详细内容,更多关于apache arrow入门的资料请关注<计算机技术网(www.ctvol.com)!!>其它相关文章!

需要了解更多数据库技术:JVM上高性能数据格式库包Apache Arrow入门和架构详解(Gkatziouras),都可以关注数据库技术分享栏目—计算机技术网(www.ctvol.com)!

本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。

ctvol管理联系方式QQ:251552304

本文章地址:https://www.ctvol.com/dtteaching/633202.html

(0)
上一篇 2021年5月31日
下一篇 2021年5月31日

精彩推荐