寒玉 Blog
  • Home
  • Books
  • About Me
  • Categories
  • Tags
  • Archives

使用DruidAPI dump Druid数据


在使用druid的时候,有时候需要更新druid的数据,一般通用的做法是dump出原始数据,进行数据更新,然后再重新load进去.目前需要dump出数据的时候,需要使用overload,需要使用命令行工具进行操作,dump到文件中.可是当数据量比较大,我们需要并行操作的时候并行操作的话,就不好处理了.因为没有可用的api.昨天翻了下源码,修改了下DumpSegment工具,可以实现类似API的调用.

package io.druid.cli;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ConciseBitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.bitmap.RoaringBitmapFactory;
import com.metamx.common.ISE;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.granularity.QueryGranularities;
import io.druid.guice.annotations.Json;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.query.metadata.metadata.ListColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IndexIO;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.filter.Filters;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.chrono.ISOChronology;
import org.roaringbitmap.IntIterator;

public class TestDumpSegment3
{
  private static final Logger log = new Logger(TestDumpSegment3.class);

  public static IndexIO indexIO ;
  public static ColumnConfig columnConfig = null;
  public static ObjectMapper objectMapper;
  static {
    columnConfig = new DruidProcessingConfig()
    {
      @Override
      public String getFormatString()
      {
        return "processing-%s";
      }

      @Override
      public int intermediateComputeSizeBytes()
      {
        return 100 * 1024 * 1024;
      }

      @Override
      public int getNumThreads()
      {
        return 1;
      }

      @Override
      public int columnCacheSizeBytes()
      {
        return 25 * 1024 * 1024;
      }
    };

    objectMapper =  new DefaultObjectMapper() ;

    indexIO = new IndexIO(objectMapper, columnConfig);
  }

  private enum DumpType
  {
    ROWS,
    METADATA,
    BITMAPS
  }

  public TestDumpSegment3()
  {
  }


  public String directory;


  public String outputFileName;


  public String filterJson = null;


  public List<String> columnNamesFromCli = Lists.newArrayList();


  public boolean timeISO8601 = false;


  public String dumpTypeString = DumpType.ROWS.toString();


  public boolean decompressBitmaps = false;


  public static void main(String[] args) {

    TestDumpSegment3 testDumpSegment3 = new TestDumpSegment3();


    testDumpSegment3.outputFileName = "/Users/xixuebin/Downloads/123";

    try (final QueryableIndex index = indexIO.loadIndex(new File(
        "/Users/xixuebin/Downloads/0" //包含msooh文件的路径
    ))) {
      List<String> result = testDumpSegment3.runDump(index);
      for (String line :result){
        System.out.println("args = [" + line + "]");
      }

    }
    catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }


  private void runMetadata(final Injector injector, final QueryableIndex index) throws IOException
  {
    final ObjectMapper objectMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class))
                                              .copy()
                                              .configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
    final SegmentMetadataQuery query = new SegmentMetadataQuery(
        new TableDataSource("dataSource"),
        new SpecificSegmentSpec(new SegmentDescriptor(index.getDataInterval(), "0", 0)),
        new ListColumnIncluderator(getColumnsToInclude(index)),
        false,
        null,
        EnumSet.allOf(SegmentMetadataQuery.AnalysisType.class),
        false,
        false
    );
    withOutputStream(
        new Function<OutputStream, Object>()
        {
          @Override
          public Object apply(final OutputStream out)
          {
            evaluateSequenceForSideEffects(
                Sequences.map(
                    executeQuery(injector, index, query),
                    new Function<SegmentAnalysis, Object>()
                    {
                      @Override
                      public Object apply(SegmentAnalysis analysis)
                      {
                        try {
                          objectMapper.writeValue(out, analysis);
                        }
                        catch (IOException e) {
                          throw Throwables.propagate(e);
                        }
                        return null;
                      }
                    }
                )
            );

            return null;
          }
        }
    );
  }

  public List<String> runDump(final QueryableIndex index) throws IOException
  {
    final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(index);
    final List<String> columnNames = getColumnsToInclude(index);
    final DimFilter filter = filterJson != null ? objectMapper.readValue(filterJson, DimFilter.class) : null;
    final List<String> result = Lists.newArrayList();
    final Sequence<Cursor> cursors = adapter.makeCursors(
        Filters.toFilter(filter),
        index.getDataInterval().withChronology(ISOChronology.getInstanceUTC()),
        QueryGranularities.ALL,
        false
    );

    final Sequence<Object> sequence = Sequences.map(
                cursors,
                new Function<Cursor, Object>()
                {
                  @Override
                  public Object apply(Cursor cursor)
                  {
                    final List<ObjectColumnSelector> selectors = Lists.newArrayList();

                    for (String columnName : columnNames) {
                      selectors.add(makeSelector(columnName, index.getColumn(columnName), cursor));
                    }

                    while (!cursor.isDone()) {
                      final Map<String, Object> row = Maps.newLinkedHashMap();

                      for (int i = 0; i < columnNames.size(); i++) {
                        final String columnName = columnNames.get(i);
                        final Object value = selectors.get(i).get();

                        if (timeISO8601 && columnNames.get(i).equals(Column.TIME_COLUMN_NAME)) {
                          row.put(columnName, new DateTime(value, DateTimeZone.UTC).toString());
                        } else {
                          row.put(columnName, value);
                        }
                      }

                      try {
                        result.add(objectMapper.writeValueAsString(row));
                      } catch (JsonProcessingException e) {
                        e.printStackTrace();
                      }

                      cursor.advance();
                    }

                    return null;
                  }
                }
            );

    evaluateSequenceForSideEffects(sequence);

    return result;
  }

  private void runBitmaps(final Injector injector, final QueryableIndex index) throws IOException
  {
    final ObjectMapper objectMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
    final BitmapFactory bitmapFactory = index.getBitmapFactoryForDimensions();
    final BitmapSerdeFactory bitmapSerdeFactory;

    if (bitmapFactory instanceof ConciseBitmapFactory) {
      bitmapSerdeFactory = new ConciseBitmapSerdeFactory();
    } else if (bitmapFactory instanceof RoaringBitmapFactory) {
      bitmapSerdeFactory = new RoaringBitmapSerdeFactory(null);
    } else {
      throw new ISE(
          "Don't know which BitmapSerdeFactory to use for BitmapFactory[%s]!",
          bitmapFactory.getClass().getName()
      );
    }

    final List<String> columnNames = getColumnsToInclude(index);

    withOutputStream(
        new Function<OutputStream, Object>()
        {
          @Override
          public Object apply(final OutputStream out)
          {
            try {
              final JsonGenerator jg = objectMapper.getFactory().createGenerator(out);

              jg.writeStartObject();
              jg.writeObjectField("bitmapSerdeFactory", bitmapSerdeFactory);
              jg.writeFieldName("bitmaps");
              jg.writeStartObject();

              for (final String columnName : columnNames) {
                final Column column = index.getColumn(columnName);
                final BitmapIndex bitmapIndex = column.getBitmapIndex();

                if (bitmapIndex == null) {
                  jg.writeNullField(columnName);
                } else {
                  jg.writeFieldName(columnName);
                  jg.writeStartObject();
                  for (int i = 0; i < bitmapIndex.getCardinality(); i++) {
                    jg.writeFieldName(Strings.nullToEmpty(bitmapIndex.getValue(i)));
                    final ImmutableBitmap bitmap = bitmapIndex.getBitmap(i);
                    if (decompressBitmaps) {
                      jg.writeStartArray();
                      final IntIterator iterator = bitmap.iterator();
                      while (iterator.hasNext()) {
                        final int rowNum = iterator.next();
                        jg.writeNumber(rowNum);
                      }
                      jg.writeEndArray();
                    } else {
                      jg.writeBinary(bitmapSerdeFactory.getObjectStrategy().toBytes(bitmap));
                    }
                  }
                  jg.writeEndObject();
                }
              }

              jg.writeEndObject();
              jg.writeEndObject();
              jg.close();
            }
            catch (IOException e) {
              throw Throwables.propagate(e);
            }

            return null;
          }
        }
    );
  }

  private List<String> getColumnsToInclude(final QueryableIndex index)
  {
    final Set<String> columnNames = Sets.newLinkedHashSet(columnNamesFromCli);

    // Empty columnNames => include all columns.
    if (columnNames.isEmpty()) {
      columnNames.add(Column.TIME_COLUMN_NAME);
      Iterables.addAll(columnNames, index.getColumnNames());
    } else {
      // Remove any provided columns that do not exist in this segment.
      for (String columnName : ImmutableList.copyOf(columnNames)) {
        if (index.getColumn(columnName) == null) {
          columnNames.remove(columnName);
        }
      }
    }

    return ImmutableList.copyOf(columnNames);
  }

  private <T> T withOutputStream(Function<OutputStream, T> f) throws IOException
  {
    if (outputFileName == null) {
      return f.apply(System.out);
    } else {
      try (final OutputStream out = new FileOutputStream(outputFileName)) {
        return f.apply(out);
      }
    }
  }


  private static <T> Sequence<T> executeQuery(final Injector injector, final QueryableIndex index, final Query<T> query)
  {
    final QueryRunnerFactoryConglomerate conglomerate = injector.getInstance(QueryRunnerFactoryConglomerate.class);
    final QueryRunnerFactory factory = conglomerate.findFactory(query);
    final QueryRunner<T> runner = factory.createRunner(new QueryableIndexSegment("segment", index));
    final Sequence results = factory.getToolchest().mergeResults(
        factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner>of(runner))
    ).run(query, Maps.<String, Object>newHashMap());
    return (Sequence<T>) results;
  }

  private static <T> void evaluateSequenceForSideEffects(final Sequence<T> sequence)
  {
    sequence.accumulate(
        null,
        new Accumulator<Object, T>()
        {
          @Override
          public Object accumulate(Object accumulated, T in)
          {
            return null;
          }
        }
    );
  }

  private static ObjectColumnSelector makeSelector(
      final String columnName,
      final Column column,
      final ColumnSelectorFactory columnSelectorFactory
  )
  {
    final ObjectColumnSelector selector;

    if (column.getDictionaryEncoding() != null) {
      // Special case for dimensions -> always wrap multi-value in arrays
      final DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(
          new DefaultDimensionSpec(columnName, columnName)
      );
      if (column.getDictionaryEncoding().hasMultipleValues()) {
        return new ObjectColumnSelector<List>()
        {
          @Override
          public Class<List> classOfObject()
          {
            return List.class;
          }

          @Override
          public List<String> get()
          {
            final IndexedInts row = dimensionSelector.getRow();
            if (row.size() == 0) {
              return null;
            } else {
              final List<String> retVal = Lists.newArrayList();
              for (int i = 0; i < row.size(); i++) {
                retVal.add(dimensionSelector.lookupName(row.get(i)));
              }
              return retVal;
            }
          }
        };
      } else {
        return new ObjectColumnSelector<String>()
        {
          @Override
          public Class<String> classOfObject()
          {
            return String.class;
          }

          @Override
          public String get()
          {
            final IndexedInts row = dimensionSelector.getRow();
            return row.size() == 0 ? null : dimensionSelector.lookupName(row.get(0));
          }
        };
      }
    } else {
      final ObjectColumnSelector maybeSelector = columnSelectorFactory.makeObjectColumnSelector(columnName);
      if (maybeSelector != null) {
        selector = maybeSelector;
      } else {
        // Selector failed to create (unrecognized column type?)
        log.warn("Could not create selector for column[%s], returning null.", columnName);
        selector = new ObjectColumnSelector()
        {
          @Override
          public Class classOfObject()
          {
            return Object.class;
          }

          @Override
          public Object get()
          {
            return null;
          }
        };
      }
    }

    return selector;
  }
}

使用该工具可以直接获取到dump文件的结果.例如在spark中引入该类,直接读取smoosh文件,经处理之后.输出parquet文件.


  • « Mac小技巧
  • Maven:mirror和repository区别 »

Published

4 1, 2019

Category

druid

Tags

  • druid 3
  • Powered by Pelican. Theme: Elegant by Talha Mansoor