在使用druid的时候,有时候需要更新druid的数据,一般通用的做法是dump出原始数据,进行数据更新,然后再重新load进去.目前需要dump出数据的时候,需要使用overload,需要使用命令行工具进行操作,dump到文件中.可是当数据量比较大,我们需要并行操作的时候并行操作的话,就不好处理了.因为没有可用的api.昨天翻了下源码,修改了下DumpSegment
工具,可以实现类似API的调用.
package io.druid.cli;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ConciseBitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.bitmap.RoaringBitmapFactory;
import com.metamx.common.ISE;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.granularity.QueryGranularities;
import io.druid.guice.annotations.Json;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.query.metadata.metadata.ListColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IndexIO;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.filter.Filters;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.chrono.ISOChronology;
import org.roaringbitmap.IntIterator;
public class TestDumpSegment3
{
private static final Logger log = new Logger(TestDumpSegment3.class);
public static IndexIO indexIO ;
public static ColumnConfig columnConfig = null;
public static ObjectMapper objectMapper;
static {
columnConfig = new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return "processing-%s";
}
@Override
public int intermediateComputeSizeBytes()
{
return 100 * 1024 * 1024;
}
@Override
public int getNumThreads()
{
return 1;
}
@Override
public int columnCacheSizeBytes()
{
return 25 * 1024 * 1024;
}
};
objectMapper = new DefaultObjectMapper() ;
indexIO = new IndexIO(objectMapper, columnConfig);
}
private enum DumpType
{
ROWS,
METADATA,
BITMAPS
}
public TestDumpSegment3()
{
}
public String directory;
public String outputFileName;
public String filterJson = null;
public List<String> columnNamesFromCli = Lists.newArrayList();
public boolean timeISO8601 = false;
public String dumpTypeString = DumpType.ROWS.toString();
public boolean decompressBitmaps = false;
public static void main(String[] args) {
TestDumpSegment3 testDumpSegment3 = new TestDumpSegment3();
testDumpSegment3.outputFileName = "/Users/xixuebin/Downloads/123";
try (final QueryableIndex index = indexIO.loadIndex(new File(
"/Users/xixuebin/Downloads/0" //包含msooh文件的路径
))) {
List<String> result = testDumpSegment3.runDump(index);
for (String line :result){
System.out.println("args = [" + line + "]");
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private void runMetadata(final Injector injector, final QueryableIndex index) throws IOException
{
final ObjectMapper objectMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class))
.copy()
.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
final SegmentMetadataQuery query = new SegmentMetadataQuery(
new TableDataSource("dataSource"),
new SpecificSegmentSpec(new SegmentDescriptor(index.getDataInterval(), "0", 0)),
new ListColumnIncluderator(getColumnsToInclude(index)),
false,
null,
EnumSet.allOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
);
withOutputStream(
new Function<OutputStream, Object>()
{
@Override
public Object apply(final OutputStream out)
{
evaluateSequenceForSideEffects(
Sequences.map(
executeQuery(injector, index, query),
new Function<SegmentAnalysis, Object>()
{
@Override
public Object apply(SegmentAnalysis analysis)
{
try {
objectMapper.writeValue(out, analysis);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
return null;
}
}
)
);
return null;
}
}
);
}
public List<String> runDump(final QueryableIndex index) throws IOException
{
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(index);
final List<String> columnNames = getColumnsToInclude(index);
final DimFilter filter = filterJson != null ? objectMapper.readValue(filterJson, DimFilter.class) : null;
final List<String> result = Lists.newArrayList();
final Sequence<Cursor> cursors = adapter.makeCursors(
Filters.toFilter(filter),
index.getDataInterval().withChronology(ISOChronology.getInstanceUTC()),
QueryGranularities.ALL,
false
);
final Sequence<Object> sequence = Sequences.map(
cursors,
new Function<Cursor, Object>()
{
@Override
public Object apply(Cursor cursor)
{
final List<ObjectColumnSelector> selectors = Lists.newArrayList();
for (String columnName : columnNames) {
selectors.add(makeSelector(columnName, index.getColumn(columnName), cursor));
}
while (!cursor.isDone()) {
final Map<String, Object> row = Maps.newLinkedHashMap();
for (int i = 0; i < columnNames.size(); i++) {
final String columnName = columnNames.get(i);
final Object value = selectors.get(i).get();
if (timeISO8601 && columnNames.get(i).equals(Column.TIME_COLUMN_NAME)) {
row.put(columnName, new DateTime(value, DateTimeZone.UTC).toString());
} else {
row.put(columnName, value);
}
}
try {
result.add(objectMapper.writeValueAsString(row));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
cursor.advance();
}
return null;
}
}
);
evaluateSequenceForSideEffects(sequence);
return result;
}
private void runBitmaps(final Injector injector, final QueryableIndex index) throws IOException
{
final ObjectMapper objectMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
final BitmapFactory bitmapFactory = index.getBitmapFactoryForDimensions();
final BitmapSerdeFactory bitmapSerdeFactory;
if (bitmapFactory instanceof ConciseBitmapFactory) {
bitmapSerdeFactory = new ConciseBitmapSerdeFactory();
} else if (bitmapFactory instanceof RoaringBitmapFactory) {
bitmapSerdeFactory = new RoaringBitmapSerdeFactory(null);
} else {
throw new ISE(
"Don't know which BitmapSerdeFactory to use for BitmapFactory[%s]!",
bitmapFactory.getClass().getName()
);
}
final List<String> columnNames = getColumnsToInclude(index);
withOutputStream(
new Function<OutputStream, Object>()
{
@Override
public Object apply(final OutputStream out)
{
try {
final JsonGenerator jg = objectMapper.getFactory().createGenerator(out);
jg.writeStartObject();
jg.writeObjectField("bitmapSerdeFactory", bitmapSerdeFactory);
jg.writeFieldName("bitmaps");
jg.writeStartObject();
for (final String columnName : columnNames) {
final Column column = index.getColumn(columnName);
final BitmapIndex bitmapIndex = column.getBitmapIndex();
if (bitmapIndex == null) {
jg.writeNullField(columnName);
} else {
jg.writeFieldName(columnName);
jg.writeStartObject();
for (int i = 0; i < bitmapIndex.getCardinality(); i++) {
jg.writeFieldName(Strings.nullToEmpty(bitmapIndex.getValue(i)));
final ImmutableBitmap bitmap = bitmapIndex.getBitmap(i);
if (decompressBitmaps) {
jg.writeStartArray();
final IntIterator iterator = bitmap.iterator();
while (iterator.hasNext()) {
final int rowNum = iterator.next();
jg.writeNumber(rowNum);
}
jg.writeEndArray();
} else {
jg.writeBinary(bitmapSerdeFactory.getObjectStrategy().toBytes(bitmap));
}
}
jg.writeEndObject();
}
}
jg.writeEndObject();
jg.writeEndObject();
jg.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
return null;
}
}
);
}
private List<String> getColumnsToInclude(final QueryableIndex index)
{
final Set<String> columnNames = Sets.newLinkedHashSet(columnNamesFromCli);
// Empty columnNames => include all columns.
if (columnNames.isEmpty()) {
columnNames.add(Column.TIME_COLUMN_NAME);
Iterables.addAll(columnNames, index.getColumnNames());
} else {
// Remove any provided columns that do not exist in this segment.
for (String columnName : ImmutableList.copyOf(columnNames)) {
if (index.getColumn(columnName) == null) {
columnNames.remove(columnName);
}
}
}
return ImmutableList.copyOf(columnNames);
}
private <T> T withOutputStream(Function<OutputStream, T> f) throws IOException
{
if (outputFileName == null) {
return f.apply(System.out);
} else {
try (final OutputStream out = new FileOutputStream(outputFileName)) {
return f.apply(out);
}
}
}
private static <T> Sequence<T> executeQuery(final Injector injector, final QueryableIndex index, final Query<T> query)
{
final QueryRunnerFactoryConglomerate conglomerate = injector.getInstance(QueryRunnerFactoryConglomerate.class);
final QueryRunnerFactory factory = conglomerate.findFactory(query);
final QueryRunner<T> runner = factory.createRunner(new QueryableIndexSegment("segment", index));
final Sequence results = factory.getToolchest().mergeResults(
factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner>of(runner))
).run(query, Maps.<String, Object>newHashMap());
return (Sequence<T>) results;
}
private static <T> void evaluateSequenceForSideEffects(final Sequence<T> sequence)
{
sequence.accumulate(
null,
new Accumulator<Object, T>()
{
@Override
public Object accumulate(Object accumulated, T in)
{
return null;
}
}
);
}
private static ObjectColumnSelector makeSelector(
final String columnName,
final Column column,
final ColumnSelectorFactory columnSelectorFactory
)
{
final ObjectColumnSelector selector;
if (column.getDictionaryEncoding() != null) {
// Special case for dimensions -> always wrap multi-value in arrays
final DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(
new DefaultDimensionSpec(columnName, columnName)
);
if (column.getDictionaryEncoding().hasMultipleValues()) {
return new ObjectColumnSelector<List>()
{
@Override
public Class<List> classOfObject()
{
return List.class;
}
@Override
public List<String> get()
{
final IndexedInts row = dimensionSelector.getRow();
if (row.size() == 0) {
return null;
} else {
final List<String> retVal = Lists.newArrayList();
for (int i = 0; i < row.size(); i++) {
retVal.add(dimensionSelector.lookupName(row.get(i)));
}
return retVal;
}
}
};
} else {
return new ObjectColumnSelector<String>()
{
@Override
public Class<String> classOfObject()
{
return String.class;
}
@Override
public String get()
{
final IndexedInts row = dimensionSelector.getRow();
return row.size() == 0 ? null : dimensionSelector.lookupName(row.get(0));
}
};
}
} else {
final ObjectColumnSelector maybeSelector = columnSelectorFactory.makeObjectColumnSelector(columnName);
if (maybeSelector != null) {
selector = maybeSelector;
} else {
// Selector failed to create (unrecognized column type?)
log.warn("Could not create selector for column[%s], returning null.", columnName);
selector = new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
return null;
}
};
}
}
return selector;
}
}
使用该工具可以直接获取到dump文件的结果.例如在spark中引入该类,直接读取smoosh文件,经处理之后.输出parquet文件.