LucidDbCommonRelationalSubExpressionMaterialization

From LucidDB Wiki
Jump to: navigation, search

Contents

Overview

This document describes the enhancements necessary in LucidDB and its underlying Farrago and Fennel infrastructures to support materialization of common relational subexpressions. Common relational subexpressions refer to identical relational expressions (e.g., joins between the same set of tables or a scan on a particular table with a particular filter condition) that are executed more than once as part of executing a user query.

By materializing the first instance of the common relational subexpression, we can reuse that materialized result for subsequent instances of that subexpression within the same query. This is particularly beneficial if the common relational subexpression is complex, e.g., a join between several tables, and its result is small. Having a small result means less space and time will be required to store and read the materialized result. So, instead of incurring the cost of re-executing the common relational subexpression, we only incur the cost of storing the result and subsequently reading it.

Use Cases

Two examples that could benefit from common relational subexpression materialization are:

  1. queries with a count-distinct aggregate and some other aggregate
  2. queries involving semijoins

Aggregations

If a query contains a count-distinct plus any other aggregate, the relational expression that provides input into the aggregations needs to be executed twice, once for the count-distinct and once for the other aggregates. That's a result of the rewrite done by the optimizer rule RemoveDistinctAggregateRule. You can see the two instances by looking at the explain plan for the following query, which has a count-distinct and sum aggregate on a view. The view is a join between two tables, SALES and PRODUCT. Note that there are two LhxJoinRels in the plan, each corresponding to a join between SALES and PRODUCT. The RelNodes corresponding to the common relational subexpression are highlighted in bold.

'FennelToIteratorConverter'
'  FennelReshapeRel(projection=[[1, 0]], outputRowType=[RecordType(BIGINT NOT NULL EXPR$0, INTEGER EXPR$1) NOT NULL])'
'    FennelCartesianProductRel(leftouterjoin=[false])'
'      FennelAggRel(groupCount=[0], EXPR$1=[SUM(1)])'
'        FennelReshapeRel(projection=[[3, 1]], outputRowType=[RecordType(CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NAME, INTEGER QUANTITY) NOT NULL[)'
'          LhxJoinRel(leftKeys=[[0]], rightKeys=[[2]], joinType=[INNER])'
'            LcsRowScanRel(table=[[LOCALDB, SJ, SALES]], projection=[[1, 4]], clustered indexes=[[SYS$CLUSTERED_INDEX$SALES$PRODUCT_ID, SYS$CLUSTERED_INDEX$SALES$QUANTITY]])'
'            FennelReshapeRel(projection=[[0, 1, 0]], outputRowType=[RecordType(INTEGER NOT NULL ID, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NAME, INTEGER CAST($0):INTEGER) NOT NULL])'
'              LcsRowScanRel(table=[[LOCALDB, SJ, PRODUCT]], projection=[[0, 1]], clustered indexes=[[SYS$CLUSTERED_INDEX$PRODUCT$ID, SYS$CLUSTERED_INDEX$PRODUCT$NAME]])'
'      FennelBufferRel(inMemory=[false], multiPass=[true])'
'        FennelAggRel(groupCount=[0], EXPR$0=[COUNT(0)])'
'          LhxAggRel(groupCount=[1])'
'            FennelReshapeRel(projection=[[2]], outputRowType=[RecordType(CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NAME) NOT NULL])'
'              LhxJoinRel(leftKeys=[[0]], rightKeys=[[2]], joinType=[INNER[)'
'                LcsRowScanRel(table=[[LOCALDB, SJ, SALES]], projection=[[1]], clustered indexes=[[SYS$CLUSTERED_INDEX$SALES$PRODUCT_ID]])'
'                FennelReshapeRel(projection=[[0, 1, 0]], outputRowType=[RecordType(INTEGER NOT NULL ID, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NAME, INTEGER CAST($0):INTEGER) NOT NULL])'
'                  LcsRowScanRel(table=[[LOCALDB, SJ, PRODUCT]], projection=[[0, 1]], clustered indexes=[[SYS$CLUSTERED_INDEX$PRODUCT$ID, SYS$CLUSTERED_INDEX$PRODUCT$NAME]])'

By materializing the join between SALES and PRODUCT, we can avoid executing that join twice.

Semijoins

A semijoin can be used to process a star join between a fact and dimension table by using the dimension table to filter the fact table. Typically, the dimension table is filtered so it in turn can filter the fact table. Unless the join between the fact and dimension tables can be removed through this optimization, the scan of the dimension table will need to be done twice. This redundant execution may not be significant for a small dimension table. But if the dimension table is large or chained semijoins are being used, then materializing that portion of the query can be beneficial.

The following shows an example where a chained semijoin is used. STATE is filtered on the value 'New York', and it turn filters CUSTOMER, which finally filters the SALES fact table. The relational subexpression corresponding to the scan on STATE followed by the index lookup and scan on CUSTOMER is executed twice -- once to process the semijoin to filter SALES, and then to join SALES and CUSTOMER. Again, the common relational subexpression RelNodes are highlighted in bold.

explain plan for
select * from sales s, state st, customer c
   where s.customer = c.id and c.city = st.city and
   st.state = 'New York';
'FennelToIteratorConverter'
'  FennelReshapeRel(projection=[[0, 1, 2, 3, 4, 8, 9, 5, 6, 7]], outputRowType=[RecordType(INTEGER SID, INTEGER PRODUCT_ID, INTEGER SALESPERSON, INTEGER CUSTOMER, INTEGER QUANTITY, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL CITY, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" STATE, INTEGER NOT NULL ID, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" COMPANY, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL CITY0) NOT NULL])'
'    LhxJoinRel(leftKeys=[[3]], rightKeys=[[5]], joinType=[INNER])'
'      LcsRowScanRel(table=[[LOCALDB, SJ, SALES]], projection=[*], clustered indexes=[[SYS$CLUSTERED_INDEX$SALES$CUSTOMER, SYS$CLUSTERED_INDEX$SALES$PRODUCT_ID, SYS$CLUSTERED_INDEX$SALES$QUANTITY, SYS$CLUSTERED_INDEX$SALES$SALESPERSON, SYS$CLUSTERED_INDEX$SALES$SID]])'
'        LcsIndexMergeRel(consumerSridParamId=[0], segmentLimitParamId=[0], ridLimitParamId=[2])'
'          LcsIndexSearchRel(table=[[LOCALDB, SJ, SALES]], index=[I_SALES_CUST], projection=[*], inputKeyProj=[*], inputDirectiveProj=[[]], startRidParamId=[0], rowLimitParamId=[0])'
'            FennelSortRel(key=[[0]], discardDuplicates=[false])'
'              FennelReshapeRel(projection=[[0]], outputRowType=[RecordType(INTEGER ID) NOT NULL])'
'                LcsRowScanRel(table=[[LOCALDB, SJ, CUSTOMER]], projection=[[0]], clustered indexes=[[SYS$CLUSTERED_INDEX$CUSTOMER$ID]])'
'                  LcsIndexMergeRel(consumerSridParamId=[0], segmentLimitParamId=[0], ridLimitParamId=[1])'
'                    LcsIndexSearchRel(table=[[LOCALDB, SJ, CUSTOMER]], index=[I_CUSTOMER_CITY], projection=[*], inputKeyProj=[*], inputDirectiveProj=[[]], startRidParamId=[0], rowLimitParamId=[0])'
'                      FennelSortRel(key=[[0]], discardDuplicates=[false])'
'                        LcsRowScanRel(table=[[LOCALDB, SJ, STATE]], projection=[[0]], clustered indexes=[[SYS$CLUSTERED_INDEX$STATE$CITY, SYS$CLUSTERED_INDEX$STATE$STATE]], residual columns=[[1]])'
'                          FennelValuesRel(tuples=[[{ '[', 'New York            ', ']', 'New York            ' }]])'
'      FennelReshapeRel(projection=[[0, 1, 2, 3, 4, 0]], outputRowType=[RecordType(INTEGER NOT NULL ID, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" COMPANY, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL CITY, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL CITY0, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" STATE, INTEGER CAST($0):INTEGER) NOT NULL])'
'        LhxJoinRel(leftKeys=[[2]], rightKeys=[[0]], joinType=[INNER])'
'          LcsRowScanRel(table=[[LOCALDB, SJ, CUSTOMER]], projection=[*], clustered indexes=[[SYS$CLUSTERED_INDEX$CUSTOMER$CITY, SYS$CLUSTERED_INDEX$CUSTOMER$COMPANY, SYS$CLUSTERED_INDEX$CUSTOMER$ID]])'
'            LcsIndexMergeRel(consumerSridParamId=[0], segmentLimitParamId=[0], ridLimitParamId=[1])'
'              LcsIndexSearchRel(table=[[LOCALDB, SJ, CUSTOMER]], index=[I_CUSTOMER_CITY[, projection=[*], inputKeyProj=[*], inputDirectiveProj=[[]], startRidParamId=[0], rowLimitParamId=[0])'
'                FennelSortRel(key=[[0]], discardDuplicates=[false])'
'                  LcsRowScanRel(table=[[LOCALDB, SJ, STATE]], projection=[[0]], clustered indexes=[[SYS$CLUSTERED_INDEX$STATE$CITY, SYS$CLUSTERED_INDEX$STATE$STATE]], residual columns=[[1]])'
'                    FennelValuesRel(tuples=[[{ '[', 'New York            ', ']', 'New York            ' }]])'
'          LcsRowScanRel(table=[[LOCALDB, SJ, STATE]], projection=[*], clustered indexes=[[SYS$CLUSTERED_INDEX$STATE$CITY, SYS$CLUSTERED_INDEX$STATE$STATE]], residual columns=[[1]])'
'            FennelValuesRel(tuples=[[{ '[', 'New York            ', ']', 'New York            ' }]])'

Fennel Changes

New and Modified Classes

Fennel already has a class named SegBufferExecStream that buffers its input and then returns the buffered data as its output. The buffered data can be returned multiple times, which is required if the buffered data is the right hand input into a cartesian join.

What we need to materialize common relational subexpressions is to buffer an input and then make the buffered data available to multiple consumer streams. One design that was initially considered was to extend SegBufferExecStream, making it a DiffluenceExecStream, so it can return multiple outputs. In other words, assuming X is the input that needs to be buffered, MultiOutputSegBufferExecStream is an extension of SegBufferExecStream, and each input into CartesianJoinExecStream consumes the buffered data, the stream graph would look like the following:

     CartesianJoinExecStream
        /             \
  MultiOutputSegBufferExecStream
              |
              X

The problem with this design though is that it does not handle the case where the consumer of the buffered data needs to do restart-open of the buffered data, as is the case for a cartesian product join. It's not possible to restart one of the outputs without restarting both. (Nested loop joins also potentially have this problem.)

The design we decided to go with instead separates the functionality into two new streams -- a writer and a reader version of SegBufferExecStream. The writer stream will be named SegBufferWriterExecStream, and the reader stream will be named SegBufferReaderExecStream. Both new streams are subsets of the functionality currently in SegBufferExecStream. SegBufferWriterExecStream takes a single input and writes the input into a buffer, so it can be read by one or more SegBufferReaderExecStreams. It writes as its output a single tuple containing the first pageId of the buffered data it has created. That pageId is written to one or more consumer SegBufferReaderExecStreams. Each SegBufferReaderExecStream takes that input, reads the buffered data generated by the corresponding SegBufferWriterExecStream, and writes the buffered data it reads to its output. Having separate streams also makes the design cleaner.

For example, a simple stream graph with two SegBufferReaderExecStreams will look like the following.

                       MergeExecStream
                      /               \
 SegBufferReaderExecStream      SegBufferReaderExecStream
               ^                              ^
                \                            /
                 \                          /
                  SegBufferWriterExecStream
                             |
                       writer's input

Another variation of this design that was also considered was to communicate the pageId of the buffered data via a dynamic parameter instead of through the dataflow between the two streams. As a result, there would be no explicit dataflow between the two streams and only an implicit dataflow so that the streams are connected. However, having implicit dataflows requires adding new code to the schedulers, and you no longer have the explicit dataflows that the scheduler can use to decide which consumer to execute next after the writer stream has finished buffering its input.

Since there is existing code in SegBufferExecStream that can be reused by the new classes, the common code will be refactored into two new helper classes -- SegBufferReader and SegBufferWriter. Both helper classes will obviously be used by SegBufferExecStream, while SegBufferWriterExecStream only needs a SegBufferWriter, and SegBufferReaderExecStream only needs a SegBufferReader.

One difference between the new classes and SegBufferExecStream is that the latter has an option that allows the buffered data to be regenerated when the stream is reopened in restart mode. The stream has a multipass parameter which needs to be set to false if those are the desired semantics. That option is not supported in the new streams because it may not be possible to destroy the buffered data when SegBufferWriterExecStream is reopened in restart mode. The buffered data may still be in use by SegBufferReaderExecStreams elsewhere in the stream graph. To support this, we would have to support multiple sets of active buffers, and keep track of which readers are using which buffers. Since the existing use cases don't need this functionality, to simplify things, we will not support it.

Scheduler Changes

As noted in the previous section, because we are not using implicit dataflows between the reader and writer streams, we do not need to modify the schedulers to handle implicit dataflows.

However, there is one change that is needed in DfsTreeExecStreamScheduler. Currently, when determining which consumer stream to execute after a producer stream has executed, the scheduler looks for the first stream it finds that has an incoming dataflow that is a non-underflow state. However, what we want is to give lower priority to streams that have an empty state, as those are streams that have not explicitly pulled from their inputs. This makes a difference in some cases. By executing a stream with an empty incoming dataflow vs a stream that has data in its input stream, a hang will result in the following scenario. If the SegBufferReaderExecStream that initiated the request is upstream from a producer that's a sink in the middle of the stream graph, e.g., a producer that's the input into a UDX, and it's not in the first outgoing dataflow from the SegBufferWriterExecStream, then that initiating SegBufferReaderExecStream will never get rescheduled. The SegBufferReaderExecStream which did not explicitly request data will be executed instead.

To address this, SegBufferWriterExecStream needs to first write to its outputs that are in an underflow state over those with an empty state. And DfsTreeExecStreamScheduler::findNextConsumer needs to ignore empty dataflows if there are other available non-underflow dataflows.

The parallel scheduler (ParallelExecStreamScheduler) does not need to be changed because it already schedules all non-underflow consumers to execute after a stream has produced.

Early Closes

Fennel supports early closes. This allows an execution stream to close all of its producers and their producers after it has finished some part of its execution. For example, ExternalSortExecStream does an early close as soon as it has finished reading all of its input and is ready to start sorting the data. By closing certain producers early instead of waiting for the entire stream graph to be closed, resources associated with those early-closing producers can be freed up early.

Early closes are also necessary to make certain types of queries work correctly. For example, if the underlying data storage does not support page versioning, a self-insert statement like the following:

insert into tab select * from tab;

requires an early close after the select portion of the stream graph has finished processing all its data. That's because the select portion of the stream is holding a shared lock on the last page it has read. The insert portion of the stream graph may need to insert data into that same page and will wait until it can acquire exclusive access to the page. This results in the statement hanging. Doing an early close releases the shared lock and allows the insert to acquire the exclusive lock.

Since the SegBufferWriterExecStream creates the buffered data that's read by the reader streams, it also makes sense for it to destroy it. However, it cannot destroy the data until all of its readers have finished executing. Ignoring early closes for the moment, that works ok because readers end up being closed before the corresponding writer and therefore, there will be no active readers of the buffered data when the SegBufferWriterExecStream is closed.

Early closes, however, pose problems. Consider the following stream, where the SBW is a SegBufferWriterExecStream and the SBRs are corresponding SegBufferReaderExecStreams.

  SBW -----> SBR --
   |               \ 
   |                Y ----- Z 
   |               /       /
   |           X --  SBR --
   |                  ^
   |                  |     
   |-------------------

If Y initiates an early close, because SBW is connected to the top SBR, that results in the SegBufferWriterExecStream being closed, which results in the buffered data that's shared being destroyed, even though the bottom SegBufferReaderExecStream may not have finished, or even started, reading the buffered data.

To address early closes, a dynamic parameter will be shared between a SegBufferWriterExecStream and its corresponding SegBufferReaderExecStreams. That parameter will be used as a reference counter to keep track of the number of active readers of the buffer generated by the SegBufferWriterExecStream. The counter will be incremented whenever any of the SegBufferReaderExecStreams are opened, and it will be decremented whenever they are closed. If the reference count is non-zero, when an attempt is made to early close a SegBufferWriterExecStream, the early close on the SegBufferWriterExecStream will be ignored. A new method canEarlyClose will be added to ExecStream that returns true by default. SegBufferWriterExecStream will override the method and return a value based on the value of the reference counter.

New Dynamic Parameter Counters

To support dynamic parameters that are used as reference counters, the following methods will be added to the DynamicParamManager class:

/**
 * Creates a new dynamic parameter that will be used as a counter.
 * Initializes the parameter value to 0.
 *
 * @param dynamicParamId unique ID of parameter within this manager; IDs
 * need not be contiguous, and must be assigned by some other authority
 *
 * @param failIfExists if true (the default) an assertion failure
 * will occur if dynamicParamId is already in use
 */
void createCounterParam(
    DynamicParamId dynamicParamId,
    bool failIfExists = true);

/**
 * Increments a dynamic parameter that corresponds to a counter.
 *
 * @param dynamicParamId ID with which the counter parameter was created
 */
void incrementCounterParam(DynamicParamId dynamicParamId);

/**
 * Decrements a dynamic parameter that corresponds to a counter.
 *
 * @param dynamicParamId ID with which the counter parameter was created
 */
void decrementCounterParam(DynamicParamId dynamicParamId);

Dynamic parameters that are created as counters are different from existing dynamic parameters in that the new increment and decrement methods can be called on them. The increment and decrement are atomic operations. The existing methods to read and set dynamic parameter values can also be used on them. Their values will be stored as 8-byte signed integers. Lastly, their ids co-exist with the ids of regular dynamic parameters. In other words, once you've assigned a dynamic parameter id to a counter parameter, you cannot use that same id for a different parameter, even if that other parameter is a non-counter.

Farrago Changes

Hep Changes

LucidDB uses Hep to optimize queries. Hep executes HepInstructions, which correspond to some collection of optimizer rules. A new type of instruction will be added to Hep, corresponding to rules that will only be applied on common relational subexpressions.

Common relational subexpressions will be detected by examining the query graph that Hep builds. Each vertex in the graph corresponds to a relational expression in the query. Directed edges connect the vertices together representing parent-child relationships between the different relational expressions. Any vertex in the graph that has more than one parent vertex is a common relational subexpression because it means that the subexpression appears in multiple places in the graph. Note, however, that if the same relational expression appears as multiple inputs into the same parent, there is only a single edge from the parent vertex to the child vertex. So, simply counting the number of edges into a vertex is not sufficient.

A new abstract base class named CommonRelSubExprRule will be created. The rule will extend from RelOptRule. Any rules that potentially can be applied on common relational subexpressions will derive from this new base class.

Hep will be extended so the new instruction can be added to the appropriate places in a HepProgram. When the new instruction is invoked, Hep will attempt to pattern match any CommonRelSubExprRules against subgraphs where the root vertex is a common relational subexpression. Once a match is found, Hep will pass along a list of the parent vertices corresponding to the common relational subexpression to the matching rule. The CommonRelSubExprRule can use this additional information in its decision making logic. For example, from the list of parent vertices, it can determine how many times the common relational subexpression is used and use that number in some cost calculation.

Converting the Common Relational Subexpressions

A new rule named BufferCommonRelSubExprRule will derive from CommonRelSubExprRule. The rule simply matches on any RelNode.

Based on the number of times the common relational subexpression is used (N), the rule will compare the cost of executing the common relational subexpression N times versus the cost of executing it once, buffering the result, and reading the buffered result N times. If the rule determines that buffering is beneficial, it will place a FennelMultiUseBufferRel on top of the original relational expression. The new class FennelMultiUseBufferRel will be derived from the existing FennelBufferRel class.

To avoid an infinite loop, the rule is a no-op if the matching RelNode is already a FennelMultiUseBufferRel.

Note that when the FennelMultiUseBufferRel is created, there is no distinction yet between whether the FennelMultiUseBufferRel is being used as a reader or writer. That distinction will not be made until either FennelMultiUseBufferRel.toStreamDef or FennelMultiUseBufferRel.implementFennelChild is called.

For each FennelMultiUseBufferRel object, we want to create a single writer stream and one reader stream for each reference to that object in the stream graph. The child input into the FennelMultiUseBufferRel will only feed into the writer stream. That means then that we only want to call the method pairs FennelMultiUseBufferRel.visitFennelChild and FennelMultiUseBufferRel.implementFennelChild once for the child input of a FennelMultiUseBufferRel object. Thus, those calls should only correspond to a writer instance of a streamDef. Note that when a stream graph has a mix of Fennel and Java RelNodes, there need to be matching calls to visitFennelChild and implementFennelChild. Otherwise, if implementFennelChild is called, but there is no corresponding visitFennelChild call, this will result in dangling FarragoTransformDefs.

It might seem like doing the extra book-keeping necessary to avoid the extra calls is superfluous, and we can always call visitFennelChild/implementFennelChild multiple times for a FennelMultiUseBufferRel object and simply throw away the results. However, consider the following query tree, where the "B"s are the buffering nodes.

     Z
    / \
   Y  B1
  / \
 B2 B2
 |   |
 X   X
 |   |
 B1 B1

If we were to call implementFennelChild on every node, but throw away the unneeded results from the visitFennelChild calls, we would end up with the following stream graph. The "R"s are the reader streams, and the "W"s are the writer streams.

     Z
    / \
   Y   \
  / \   \
 R2 R2   \
  \ /     \ 
   W2      |
   |       |
   X  X    |
   |  |    |
  R1 R1    R1
   \ /     |
    W1-----|

Note that the rightmost X ends up producing data to nowhere. That's because when visitFennelChild was called on the rightmost B2, that resulted in the rightmost X-R1 stream being created. Since W1 is the writer stream for B1, a dataflow is added from W1 to all of the "R1"s, resulting in the dangling streamDef.

Deciding which FennelMultiUseBufferRel instance should be assigned the role of writer will be based on the instance that calls either implementFennelChild or toStreamDef first. Both calls need to be taken into account. If we make the assignment based solely on only one of the two calls, then that doesn't work for a case like the following. Again, the "B"s are the FennelMultiUseBufferRels, and the "IFC"s are IteratorToFennelConverters.

        -- X --
       /       \
       B        Y
       |      /   \
      IFC1  IFC2   Z
              |
              B
              |
             IFC1

If we assign the writer based on the instance that calls toStreamDef first, then the lower B, which is a child of a IteratorToFennelConverter, ends up calling toStreamDef first, and is therefore assigned the writer role. However, implementFennelChild was already called on the topmost B. Thus, when converting IFC2 to a streamDef, we end up with mismatched RelPathEntrys because we only have the RelPathEntry list corresponding to the topmost B, rather than the desired lower one.

Doing the reverse also doesn't solve the problem. If we assign the writer based on the instance that calls implementFennelChild first, then the topmost B is the writer, but because toStreamDef ends up being called first on the lower B, we again end up with a mismatch.

To properly assign the writer role, we'll need to extend FarragoRelImplementor so it maintains two new maps:

  • A map from a RelNode to its RelPathEntry list at the time of the first call to either toStreamDef or implementFennelChild. Once the first call has been made, subsequent calls to those methods will not modify the map.
  • A map from a RelNode to the list of FemExecutionStreamDefs that it has been converted to.

This first map will be maintained as part of a new interface method that will be added to FennelRelImplementor -- isFirstTranslationInstance. FennelMultiUseBufferRel will call this new method in both its toStreamDef and implementFennelChild methods. If the call returns true, then we know that the current instance is the writer instance.

The second map will be maintained as part of the existing call to FennelRelImplementor.registerRelStreamDef. A new method FennelRelImplementor.getRegisteredStreamDefs(RelNode) will be added to retrieve the mapping.

In FennelMultiUseBufferRel.implementFennelChild, if the instance is not the writer, then implementFennelChild is not called on the child. In FennelMultiUseBufferRel.toStreamDef, if the instance is the writer, then both a writer and reader stream will be created. Otherwise, only a reader stream will be created. If a writer stream hasn't yet been created when a reader stream is created, we'll have to defer setting the incoming dataflows and dynamic parameter for the stream until later when we have the writer stream available. That's why we need to keep track of all of the streams that have already been created for the FennelMultiUseBufferRel object. Once we've created the writer stream, then we will need to go back and create the dataflows from the newly created writer to those previously created readers.

Also, only if the current instance has created a writer will we call visitFennelChild on the child input and create a dataflow from the input to the writer.

Applying the Common Relational Subexpression Rule

LucidDB's Hep program will be to be extended to add the new instruction for common relational subexpression rules. It would seem like for simplicity, we could just apply the rule once late in the Hep program to allow other optimizer rules to do all the necessary conversions to create the full range of common relational subexpressions. For example, it would not make sense to apply the instruction before RemoveDistinctAggregateRule is called because the common relational subexpression corresponding to the aggregate inputs have not been created yet.

However, applying the rule too late also does not work. To see why, study the explain outputs from the use case section. For the aggregation example, look at the LcsRowScanRels that scan on SALES. Notice that they have different projections. The first is [[1,4]] while the second is only [[1]]. As a result, the two relational expressions corresponding to those joins are not the same.

The problem also occurs in the semijoin example. Note that the projections in the row scans on CUSTOMER are also different. The one used as part of the semijoin only projects a single column while the one that's part of the join projects all columns.

To ensure that there are common relational subexpressions that will allow count-distinct aggregations and semijoins to be optimized, the common relational subexpression instruction needs to be applied according to the following criteria:

  1. The new rule needs to be applied after the rules that add the common relational subexpressions (RemoveDistinctAggregateRule and LcsIndexSemiJoinRule) have been applied.
  2. After the common relational subexpressions have been introduced and before the new rule is applied, there cannot be any pushdowns of projections that will result in eliminating the common relational subexpressions.
  3. The new rule needs to be applied after rules that remove joins (LoptOptimizeJoinRule and LoptRemoveSelfJoinRule) are applied. Otherwise, if there are common relational subexpressions that participate in joins that are removed by either of these rules, then we could end up buffering a relational subexpression that ends up only being used once in the final query tree. Or worse, the buffering could prevent the join from being removed.
  4. The new rule needs to be applied after filters have been pushed down. Otherwise, a common relational subexpression corresponding to a cartesian product join could be buffered.

To satisfy the above criteria, the following will be done:

  • We'll add the new instruction call once -- after semijoin-related rules are applied. Since semijoin-related rules are applied after the join ordering and removal rules, that satisfies criteria 3 and 4.
  • Move the application of RemoveDistinctAggregateRule so it's done later, but before the join ordering rules are applied, since RemoveDistinctAggregateRule can introduce a new join. This also needs to be done so criteria 2 is met. Currently, RemoveDistinctAggregateRule is applied very early in LucidDB's Hep program. In order to meet criteria 2, had we left the rule where it currently is, we would have had to add a second instruction call to locate common relational subexpressions early in the Hep program. Doing so, however, would violate rules 3 and 4.

These changes will address both use cases. In the future, as additional use cases for materializing common relational subexpressions are identified, if the existing instruction call is insufficient to detect those common relational subexpressions, then further tweaking of the instruction calls will be required.

Unit Tests

Fennel Unit Tests

New tests will be added to ExecStreamTestSuite to exercise the new Fennel classes. We can use the following simple stream graphs in the test cases.

                     MergeExecStream
                    /               \
 SegBufferReaderExecStream     SegBufferReaderExecStream
              ^                           ^
               \                         /
                SegWriterBufferExecStream
                           |
                 MockProducerExecStream


                 CartesianJoinExecStream
                    /               \
 SegBufferReaderExecStream     SegBufferReaderExecStream
              ^                           ^
               \                         /
                SegWriterBufferExecStream
                           |
                 MockProducerExecStream

The stream graph with CartesianJoinExecStream will exercise restart-opens on the SegBufferReaderExecStream. The test cases will be added to both ExecStreamTestSuite and ParallelExecStreamSchedulerTest.

By adding SegBufferExecStreams on top of the SegBufferReaderExecStreams, that will exercise early closes, since a SegBufferExecStream closes its producer after it has read all of its input.

Farrago Unit Tests

A new test case will be added to FarragoOptRulesTest to exercise the new common relational subexpression instruction and rule.

Existing SQL tests should exercise common relational subexpression materialization for count-distinct aggregates and semijoins. Additional SQL statements will be added if the existing tests don't provide sufficient coverage.

Performance Results

As an initial test to measure the benefits of this change, I started off using TPC-H queries with 1G and 10GB dataset sizes. Queries 2, 10, and 16 will utilize the optimization for semijoins. Query 5 potentially could as well, but based on costing, the optimizer rule rejected the buffering. None of the TPC-H queries have both a count-distinct and regular aggregate. However, query 16 has a count-distinct aggregate, so I modified query 16 to include SUM(P_RETAILPRICE). That's noted as query 16a in the table below. As a result, query 16a uses buffering for both semijoins and count-distinct. To isolate the benefits of using buffering for only count-distinct, I modified query 16a to remove the IN filter on the PART table and the GROUP BY. Doing so prevents the query from using semijoins. That's noted as query 16b.

Of the 3 original queries, query 16 showed visible improvement for both dataset sizes. The two modified queries, 16a and 16b, also showed improvement for both dataset sizes. Query 10 showed improvement with a 10GB dataset size. The other cases showed no or marginal improvement. As you would expect, the cases where there's greatest improvement are those where the cost of executing the common relational subexpression is most expensive.

Query # 1 GB Dataset 10 GB Dataset
Without Buffering With Buffering  % Improvement Without Buffering With Buffering  % Improvement
2 9.1s 8.7s 4.4% 24.6s 25.6s -
10 18.4s 18s 2.2% 110.3s 94.2s 14.6%
16 13.5s 10.9s 19.3% 61.3s 38.8s 36.7%
16a 21.2s 13.8s 34.9% 95.4s 53.6s 43.8%
16b 15.6s 11.7s 25% 79.8s 48.2s 39.6%

These tests were run on the same hardware noted here. In the 1 GB case, the default buffer pool size of 5000 pages was used. In the 10 GB case, it was increased to 49,000 pages. Each query was run in its own standalone LucidDB server instance to prevent any previously cached data from skewing the results.

Modified Queries

Query 16a:

SELECT
    P_BRAND, P_TYPE, P_SIZE,
    COUNT(DISTINCT PS_SUPPKEY) AS SUPPLIER_CNT, SUM(P_RETAILPRICE)
FROM TPCH.PARTSUPP, TPCH.PART
WHERE
    P_PARTKEY = PS_PARTKEY AND P_BRAND <> 'Brand#45'
    AND P_TYPE NOT LIKE 'MEDIUM POLISHED%'
    AND P_SIZE IN (49, 14, 23, 45, 19, 3, 36, 9)
    AND PS_SUPPKEY NOT IN (
        SELECT S_SUPPKEY FROM TPCH.SUPPLIER
        WHERE S_COMMENT LIKE '%Customer%Complaints%')
GROUP BY P_BRAND, P_TYPE, P_SIZE
ORDER BY SUPPLIER_CNT DESC, P_BRAND, P_TYPE, P_SIZE;

Query 16b:

SELECT
    P_BRAND, P_TYPE, P_SIZE,
    COUNT(DISTINCT PS_SUPPKEY) AS SUPPLIER_CNT, SUM(P_RETAILPRICE)
FROM TPCH.PARTSUPP, TPCH.PART
WHERE
    P_PARTKEY = PS_PARTKEY AND P_BRAND <> 'Brand#45'
    AND P_TYPE NOT LIKE 'MEDIUM POLISHED%'
    AND PS_SUPPKEY NOT IN (
        SELECT S_SUPPKEY FROM TPCH.SUPPLIER
        WHERE S_COMMENT LIKE '%Customer%Complaints%');
Personal tools
Product Documentation