Java org.apache.avro.io.BinaryDecoder 代码实例

・18 分钟阅读

以下是展示如何使用org.apache.avro.io.BinaryDecoder的最佳示例。 我们使用了代码质量辨别算法从开源项目中提取出了最佳的优秀示例。

实例 1


/** Return the remote protocol.  Force a handshake if required. */
public Protocol getRemote() throws IOException {
  if (remote != null) return remote;            // already have it
  MD5 remoteHash = REMOTE_HASHES.get(transceiver.getRemoteName());
  if (remoteHash != null) {
    remote = REMOTE_PROTOCOLS.get(remoteHash);
    if (remote != null) return remote;            // already cached
  }
  handshakeLock.lock();
  try {
    // force handshake
    ByteBufferOutputStream bbo = new ByteBufferOutputStream();
    // direct because the payload is tiny.
    Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
    writeHandshake(out);
    out.writeInt(0);                              // empty metadata
    out.writeString("");                          // bogus message name
    List<ByteBuffer> response =
      getTransceiver().transceive(bbo.getBufferList());
    ByteBufferInputStream bbi = new ByteBufferInputStream(response);
    BinaryDecoder in =
      DecoderFactory.get().binaryDecoder(bbi, null);
    readHandshake(in);
    return this.remote;
  } finally {
    handshakeLock.unlock();
  }
}
 

实例 2


public static <V> V fromBytes(byte[] bytes, Class<V> cls) throws Exception{
      SpecificDatumReader<V> reader = new SpecificDatumReader<V>(cls);
        BinaryDecoder binDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
        V val = cls.newInstance();
        reader.read(val, binDecoder);
        return val;
}
 

实例 3


@Override
public Object process(AsyncRequestor requestor, AsyncRequestor.Request request, String message,
    List<ByteBuffer> response) throws Exception {
  ByteBufferInputStream bbi = new ByteBufferInputStream(response);
  BinaryDecoder in = DecoderFactory.get().binaryDecoder(bbi, null);
  HandshakeResponse handshake = AvroConstants.HANDSHAKE_RESPONSE_READER.read(null, in);
  Protocol localProtocol = requestor.getLocal();
  Protocol serverProtocol;
  if (handshake.getMatch() == HandshakeMatch.BOTH) {
    serverProtocol = localProtocol;
  } else {
    serverProtocol = Protocol.parse(handshake.getServerProtocol());
  }
  RPCContext context = request.getContext();
  RPCContextHelper.setResponseCallMeta(context, AvroConstants.META_READER.read(null, in));
  if (!in.readBoolean()) {
    Schema localSchema = localProtocol.getMessages().get(message).getResponse();
    Schema remoteSchema = serverProtocol.getMessages().get(message).getResponse();
    Object responseObject = new SpecificDatumReader<>(remoteSchema, localSchema).read(null, in);
    RPCContextHelper.setResponse(context, responseObject);
    requestor.getRPCPlugins().forEach(plugin -> plugin.clientReceiveResponse(context));
    return responseObject;
  } else {
    Schema localSchema = localProtocol.getMessages().get(message).getErrors();
    Schema remoteSchema = serverProtocol.getMessages().get(message).getErrors();
    Object error = new SpecificDatumReader<>(remoteSchema, localSchema).read(null, in);
    Exception exception;
    if (error instanceof Exception) {
      exception = (Exception) error;
    } else {
      exception = new AvroRuntimeException(error.toString());
    }
    RPCContextHelper.setError(context, exception);
    requestor.getRPCPlugins().forEach(plugin -> plugin.clientReceiveResponse(context));
    throw exception;
  }
}
 

实例 4


/**
 * Deserializes a single object based on the given Schema.
 * @param writer writer's schema
 * @param bytes Array to deserialize from
 * @param ob An empty object to deserialize into (must not be null).
 * @throws IOException
 */
public static <T extends SpecificRecord> T deserialize(Schema writer, ByteBuffer bytes, T ob) throws IOException
{
    BinaryDecoder dec = DIRECT_DECODERS.createBinaryDecoder(ByteBufferUtil.getArray(bytes), null);
    SpecificDatumReader<T> reader = new SpecificDatumReader<T>(writer);
    reader.setExpected(ob.getSchema());
    return reader.read(ob, dec);
}
 

实例 5


/**
 * Determine if the current Avro version can use the ReflectDatumReader to
 * read SpecificData that includes an array. The inability to do this was a
 * bug that was fixed in Avro 1.7.0.
 * 
 * @return true if SpecificData can be properly read using a
 *         ReflectDatumReader
 */
static boolean canDecodeSpecificSchemaWithReflectDatumReader() {
  ReflectDatumReader<Record> datumReader = new ReflectDatumReader(Record.SCHEMA$);
  ReflectDatumWriter<Record> datumWriter = new ReflectDatumWriter(Record.SCHEMA$);
  Record record = new Record();
  record.subrecords = Lists.<CharSequence> newArrayList("a", "b");
  ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
  try {
    datumWriter.write(record, encoder);
    encoder.flush();
    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(
        byteArrayOutputStream.toByteArray(), null);
    datumReader.read(record, decoder);
  } catch (IOException ioe) {
    throw new RuntimeException("Error performing specific schema test", ioe);
  } catch (ClassCastException cce) {
    // This indicates that we're using a pre-1.7.0 version of Avro, as the
    // ReflectDatumReader in those versions could not correctly handle an
    // array in a SpecificData value
    return false;
  }
  return true;
}
 

实例 6


public synchronized void recieveMessages(Message message) {
        final DatumReader<B5MEvent> reader = new SpecificDatumReader<B5MEvent>(
                        B5MEvent.SCHEMA$);
        final B5MEvent b5mEvent = new B5MEvent();
        byte[] data = message.getData();
        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
        try {
                reader.read(b5mEvent, decoder);
                for (LaserMessageConsumer consumer : this.consumer) {
                        consumer.write(b5mEvent);
                }
        } catch (Exception e) {
                e.printStackTrace();
        }
}
 

实例 7


private static List<ExampleEvent> readRecords(KafkaStream<byte[],byte[]> stream, int expected) throws IOException {
    List<ExampleEvent> records = com.clearspring.analytics.util.Lists.newArrayList();
    Iterator it = stream.iterator();
    // Use loader for the expected event to make sure it is visible.
    SpecificData data = new SpecificData(ExampleEvent.class.getClassLoader());
    DatumReader<ExampleEvent> reader = data.createDatumReader(ExampleEvent.getClassSchema());
    for (int i = 0; i < expected; ++i) {
      MessageAndMetadata<byte[],byte[]> message = (MessageAndMetadata<byte[], byte[]>) it.next();
      BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message.message(),  null);
      ExampleEvent record = reader.read(null, binaryDecoder);
      records.add(record);
    }
    return records;
  }
 

实例 8


@Test
public void resetTest() throws IOException {
  File tempFile = newTestFile(true);
  String target = tempFile.getAbsolutePath();
  logger.info("Target: {}", target);
  TransientPositionTracker tracker = new TransientPositionTracker(target);
  AvroEventDeserializer.Builder desBuilder =
      new AvroEventDeserializer.Builder();
  EventDeserializer deserializer = desBuilder.build(new Context(),
      new ResettableFileInputStream(tempFile, tracker));
  BinaryDecoder decoder = null;
  DatumReader<GenericRecord> reader =
      new GenericDatumReader<GenericRecord>(schema);
  decoder = DecoderFactory.get().binaryDecoder(
      deserializer.readEvent().getBody(), decoder);
  assertEquals("bar", reader.read(null, decoder).get("foo").toString());
  deserializer.reset();
  decoder = DecoderFactory.get().binaryDecoder(
      deserializer.readEvent().getBody(), decoder);
  assertEquals("bar", reader.read(null, decoder).get("foo").toString());
  deserializer.mark();
  decoder = DecoderFactory.get().binaryDecoder(
      deserializer.readEvent().getBody(), decoder);
  assertEquals("baz", reader.read(null, decoder).get("foo").toString());
  deserializer.reset();
  decoder = DecoderFactory.get().binaryDecoder(
      deserializer.readEvent().getBody(), decoder);
  assertEquals("baz", reader.read(null, decoder).get("foo").toString());
  assertNull(deserializer.readEvent());
}
 

实例 9


private void parseNfSubscriptions() {
    if(state.getProperty(NF_SUBSCRIPTIONS) != null){
        byte[] data = base64.decodeBase64(state.getProperty(NF_SUBSCRIPTIONS));
        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
        SpecificDatumReader<TopicSubscriptionInfo> avroReader = new SpecificDatumReader<TopicSubscriptionInfo>(
                TopicSubscriptionInfo.class);
        try { // NOSONAR
            TopicSubscriptionInfo decodedInfo = null;
            while (!decoder.isEnd()) {
                decodedInfo = avroReader.read(null, decoder);
                LOG.debug("Loaded {}", decodedInfo);
                nfSubscriptions.put(decodedInfo.getTopicInfo().getId(), decodedInfo);
            }
        } catch (Exception e) {
            LOG.error("Unexpected exception occurred while reading information from decoder", e);
        }
    }else{
        LOG.info("No subscription info found in state");
    }
}
 

实例 10


protected Decoder build(Decoder reuse) throws Exception {
        Decoder decoder = null;
        if (useBinary) {
            Assert.notNull(this.inputStream, "you've selected to use a binary decoder. " +
                                                     "Please provide the input stream to decode by " +
                                                     "setting the 'inputStream' property");
            decoder = decoderFactory.binaryDecoder(this.inputStream, (BinaryDecoder) reuse);
        } else if (useJson) {
            Assert.notNull(this.schema, "you've selected to use JSON. Please provide a target schema by setting the 'schema' property");
            Assert.isTrue(StringUtils.hasText(this.input) || this.inputStream != null, "there must be either an inputStream or an inputString to build a JsonDecoder");
            decoder = StringUtils.hasText(this.input) ?
                              decoderFactory.jsonDecoder(this.schema, this.input) :
                              decoderFactory.jsonDecoder(this.schema, this.inputStream);
        }
        Assert.notNull(decoder, "could not build a decoder. Did you set both 'useJson' and 'userBinary' to false?");
        if (validate) {
            Assert.notNull(this.schema, "you've selected to validate. Please provide a target schema by setting the 'schema' property");
            decoder = decoderFactory.validatingDecoder(this.schema, decoder);
        }
        return decoder;
    }
 

实例 11


@Override
public void deserializeWalletExtension(Wallet containingWallet, byte[] data) throws Exception {
    ReflectDatumReader<KeyShareWalletExtension> datumReader = new ReflectDatumReader<>(KeyShareWalletExtension.class);
    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
    KeyShareWalletExtension result = datumReader.read(null, decoder);
    this.privateKey = result.privateKey;
    this.otherPublicKey = result.otherPublicKey;
    this.pkpDesktop = result.pkpDesktop;
    this.pkpPhone = result.pkpPhone;
    this.desktopBCParameters = result.desktopBCParameters;
    this.phoneBCParameters = result.phoneBCParameters;
}
 

实例 12


/**
 * Deserializes a single object based on the given Schema.
 * @param writer writer's schema
 * @param bytes Array to deserialize from
 * @param ob An empty object to deserialize into (must not be null).
 * @throws IOException
 */
public static <T extends SpecificRecord> T deserialize(Schema writer, byte[] bytes, T ob) throws IOException
{
    BinaryDecoder dec = DIRECT_DECODERS.createBinaryDecoder(bytes, null);
    SpecificDatumReader<T> reader = new SpecificDatumReader<T>(writer);
    reader.setExpected(ob.getSchema());
    return reader.read(ob, dec);
}
 

实例 13


@Override
protected Optional<ExperimentSpace> deserialize(ExperimentSpace.Serialized serialized) throws IOException {
  ExperimentDeployment merged = null;
  ExperimentDeployment curr = null;
  if (avroFileInput) {
    for (InputSupplier<? extends InputStream> is : serialized.getSerializedData()) {
      SeekableInput si = new SeekableByteArrayInput(ByteStreams.toByteArray(is));
      FileReader<ExperimentDeployment> dfr = DataFileReader.openReader(si, reader);
      while (dfr.hasNext()) {
        merged = merge(merged, dfr.next(curr));
      }
    }
  } else {
    BinaryDecoder decoder = null;
    for (InputSupplier<? extends InputStream> is : serialized.getSerializedData()) {
      decoder = DecoderFactory.get().binaryDecoder(is.getInput(), decoder);
      merged = merge(merged, reader.read(curr, decoder));
    }
  }
  return merged == null ? Optional.<ExperimentSpace>absent() :
      Optional.fromNullable(load(merged, serialized.getVersionIdentifier()));
}
 

实例 14


@SuppressWarnings("deprecation")
protected Decoder createDecoder() throws IOException {
  switch(codecType) {
    case BINARY:
      return new BinaryDecoder(getOrCreateInputStream());
    case JSON:
      return new JsonDecoder(schema, getOrCreateInputStream());
  }
  return null;
}
 

实例 15


@Override
public void init(StarTreeConfig starTreeConfig, ThirdEyeKafkaConfig kafkaConfig) throws Exception
{
  this.starTreeConfig = starTreeConfig;
  this.decoderThreadLocal = new ThreadLocal<BinaryDecoder>();
  // Get schema
  String schemaUri = kafkaConfig.getDecoderConfig().getProperty(PROP_SCHEMA_URI);
  if (schemaUri == null)
  {
    throw new IllegalStateException("Must provide " + PROP_SCHEMA_URI);
  }
  InputStream schemaInputStream = URI.create(schemaUri).toURL().openStream();
  Schema schema = Schema.parse(schemaInputStream);
  schemaInputStream.close();
  LOG.info("Init decoder for {} with schema {}", starTreeConfig.getCollection(), schema);
  // Set decoder
  this.datumReader = new GenericDatumReader<GenericRecord>(schema);
  // Set any record offset
  String recordOffsetProp = kafkaConfig.getDecoderConfig().getProperty(PROP_RECORD_OFFSET);
  if (recordOffsetProp != null)
  {
    this.recordOffset = Integer.valueOf(recordOffsetProp);
  }
}
 
讨论
淘淘あ西西 profile image