LucidDbCommonRelationalSubExpressionMaterialization
Contents |
Overview
This document describes the enhancements necessary in LucidDB and its underlying Farrago and Fennel infrastructures to support materialization of common relational subexpressions. Common relational subexpressions refer to identical relational expressions (e.g., joins between the same set of tables or a scan on a particular table with a particular filter condition) that are executed more than once as part of executing a user query.
By materializing the first instance of the common relational subexpression, we can reuse that materialized result for subsequent instances of that subexpression within the same query. This is particularly beneficial if the common relational subexpression is complex, e.g., a join between several tables, and its result is small. Having a small result means less space and time will be required to store and read the materialized result. So, instead of incurring the cost of re-executing the common relational subexpression, we only incur the cost of storing the result and subsequently reading it.
Use Cases
Two examples that could benefit from common relational subexpression materialization are:
- queries with a count-distinct aggregate and some other aggregate
- queries involving semijoins
Aggregations
If a query contains a count-distinct plus any other aggregate, the relational expression that provides input into the aggregations needs to be executed twice, once for the count-distinct and once for the other aggregates. That's a result of the rewrite done by the optimizer rule RemoveDistinctAggregateRule
. You can see the two instances by looking at the explain plan for the following query, which has a count-distinct and sum aggregate on a view. The view is a join between two tables, SALES and PRODUCT. Note that there are two LhxJoinRel
s in the plan, each corresponding to a join between SALES and PRODUCT. The RelNode
s corresponding to the common relational subexpression are highlighted in bold.
'FennelToIteratorConverter' ' FennelReshapeRel(projection=[[1, 0]], outputRowType=[RecordType(BIGINT NOT NULL EXPR$0, INTEGER EXPR$1) NOT NULL])' ' FennelCartesianProductRel(leftouterjoin=[false])' ' FennelAggRel(groupCount=[0], EXPR$1=[SUM(1)])' ' FennelReshapeRel(projection=[[3, 1]], outputRowType=[RecordType(CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NAME, INTEGER QUANTITY) NOT NULL[)' ' LhxJoinRel(leftKeys=[[0]], rightKeys=[[2]], joinType=[INNER])' ' LcsRowScanRel(table=[[LOCALDB, SJ, SALES]], projection=[[1, 4]], clustered indexes=[[SYS$CLUSTERED_INDEX$SALES$PRODUCT_ID, SYS$CLUSTERED_INDEX$SALES$QUANTITY]])' ' FennelReshapeRel(projection=[[0, 1, 0]], outputRowType=[RecordType(INTEGER NOT NULL ID, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NAME, INTEGER CAST($0):INTEGER) NOT NULL])' ' LcsRowScanRel(table=[[LOCALDB, SJ, PRODUCT]], projection=[[0, 1]], clustered indexes=[[SYS$CLUSTERED_INDEX$PRODUCT$ID, SYS$CLUSTERED_INDEX$PRODUCT$NAME]])' ' FennelBufferRel(inMemory=[false], multiPass=[true])' ' FennelAggRel(groupCount=[0], EXPR$0=[COUNT(0)])' ' LhxAggRel(groupCount=[1])' ' FennelReshapeRel(projection=[[2]], outputRowType=[RecordType(CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NAME) NOT NULL])' ' LhxJoinRel(leftKeys=[[0]], rightKeys=[[2]], joinType=[INNER[)' ' LcsRowScanRel(table=[[LOCALDB, SJ, SALES]], projection=[[1]], clustered indexes=[[SYS$CLUSTERED_INDEX$SALES$PRODUCT_ID]])' ' FennelReshapeRel(projection=[[0, 1, 0]], outputRowType=[RecordType(INTEGER NOT NULL ID, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NAME, INTEGER CAST($0):INTEGER) NOT NULL])' ' LcsRowScanRel(table=[[LOCALDB, SJ, PRODUCT]], projection=[[0, 1]], clustered indexes=[[SYS$CLUSTERED_INDEX$PRODUCT$ID, SYS$CLUSTERED_INDEX$PRODUCT$NAME]])'
By materializing the join between SALES and PRODUCT, we can avoid executing that join twice.
Semijoins
A semijoin can be used to process a star join between a fact and dimension table by using the dimension table to filter the fact table. Typically, the dimension table is filtered so it in turn can filter the fact table. Unless the join between the fact and dimension tables can be removed through this optimization, the scan of the dimension table will need to be done twice. This redundant execution may not be significant for a small dimension table. But if the dimension table is large or chained semijoins are being used, then materializing that portion of the query can be beneficial.
The following shows an example where a chained semijoin is used. STATE is filtered on the value 'New York', and it turn filters CUSTOMER, which finally filters the SALES fact table. The relational subexpression corresponding to the scan on STATE followed by the index lookup and scan on CUSTOMER is executed twice -- once to process the semijoin to filter SALES, and then to join SALES and CUSTOMER. Again, the common relational subexpression RelNode
s are highlighted in bold.
explain plan for select * from sales s, state st, customer c where s.customer = c.id and c.city = st.city and st.state = 'New York';
'FennelToIteratorConverter' ' FennelReshapeRel(projection=[[0, 1, 2, 3, 4, 8, 9, 5, 6, 7]], outputRowType=[RecordType(INTEGER SID, INTEGER PRODUCT_ID, INTEGER SALESPERSON, INTEGER CUSTOMER, INTEGER QUANTITY, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL CITY, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" STATE, INTEGER NOT NULL ID, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" COMPANY, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL CITY0) NOT NULL])' ' LhxJoinRel(leftKeys=[[3]], rightKeys=[[5]], joinType=[INNER])' ' LcsRowScanRel(table=[[LOCALDB, SJ, SALES]], projection=[*], clustered indexes=[[SYS$CLUSTERED_INDEX$SALES$CUSTOMER, SYS$CLUSTERED_INDEX$SALES$PRODUCT_ID, SYS$CLUSTERED_INDEX$SALES$QUANTITY, SYS$CLUSTERED_INDEX$SALES$SALESPERSON, SYS$CLUSTERED_INDEX$SALES$SID]])' ' LcsIndexMergeRel(consumerSridParamId=[0], segmentLimitParamId=[0], ridLimitParamId=[2])' ' LcsIndexSearchRel(table=[[LOCALDB, SJ, SALES]], index=[I_SALES_CUST], projection=[*], inputKeyProj=[*], inputDirectiveProj=[[]], startRidParamId=[0], rowLimitParamId=[0])' ' FennelSortRel(key=[[0]], discardDuplicates=[false])' ' FennelReshapeRel(projection=[[0]], outputRowType=[RecordType(INTEGER ID) NOT NULL])' ' LcsRowScanRel(table=[[LOCALDB, SJ, CUSTOMER]], projection=[[0]], clustered indexes=[[SYS$CLUSTERED_INDEX$CUSTOMER$ID]])' ' LcsIndexMergeRel(consumerSridParamId=[0], segmentLimitParamId=[0], ridLimitParamId=[1])' ' LcsIndexSearchRel(table=[[LOCALDB, SJ, CUSTOMER]], index=[I_CUSTOMER_CITY], projection=[*], inputKeyProj=[*], inputDirectiveProj=[[]], startRidParamId=[0], rowLimitParamId=[0])' ' FennelSortRel(key=[[0]], discardDuplicates=[false])' ' LcsRowScanRel(table=[[LOCALDB, SJ, STATE]], projection=[[0]], clustered indexes=[[SYS$CLUSTERED_INDEX$STATE$CITY, SYS$CLUSTERED_INDEX$STATE$STATE]], residual columns=[[1]])' ' FennelValuesRel(tuples=[[{ '[', 'New York ', ']', 'New York ' }]])' ' FennelReshapeRel(projection=[[0, 1, 2, 3, 4, 0]], outputRowType=[RecordType(INTEGER NOT NULL ID, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" COMPANY, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL CITY, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL CITY0, CHAR(20) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" STATE, INTEGER CAST($0):INTEGER) NOT NULL])' ' LhxJoinRel(leftKeys=[[2]], rightKeys=[[0]], joinType=[INNER])' ' LcsRowScanRel(table=[[LOCALDB, SJ, CUSTOMER]], projection=[*], clustered indexes=[[SYS$CLUSTERED_INDEX$CUSTOMER$CITY, SYS$CLUSTERED_INDEX$CUSTOMER$COMPANY, SYS$CLUSTERED_INDEX$CUSTOMER$ID]])' ' LcsIndexMergeRel(consumerSridParamId=[0], segmentLimitParamId=[0], ridLimitParamId=[1])' ' LcsIndexSearchRel(table=[[LOCALDB, SJ, CUSTOMER]], index=[I_CUSTOMER_CITY[, projection=[*], inputKeyProj=[*], inputDirectiveProj=[[]], startRidParamId=[0], rowLimitParamId=[0])' ' FennelSortRel(key=[[0]], discardDuplicates=[false])' ' LcsRowScanRel(table=[[LOCALDB, SJ, STATE]], projection=[[0]], clustered indexes=[[SYS$CLUSTERED_INDEX$STATE$CITY, SYS$CLUSTERED_INDEX$STATE$STATE]], residual columns=[[1]])' ' FennelValuesRel(tuples=[[{ '[', 'New York ', ']', 'New York ' }]])' ' LcsRowScanRel(table=[[LOCALDB, SJ, STATE]], projection=[*], clustered indexes=[[SYS$CLUSTERED_INDEX$STATE$CITY, SYS$CLUSTERED_INDEX$STATE$STATE]], residual columns=[[1]])' ' FennelValuesRel(tuples=[[{ '[', 'New York ', ']', 'New York ' }]])'
Fennel Changes
New and Modified Classes
Fennel already has a class named SegBufferExecStream
that buffers its input and then returns the buffered data as its output. The buffered data can be returned multiple times, which is required if the buffered data is the right hand input into a cartesian join.
What we need to materialize common relational subexpressions is to buffer an input and then make the buffered data available to multiple consumer streams. One design that was initially considered was to extend SegBufferExecStream
, making it a DiffluenceExecStream
, so it can return multiple outputs. In other words, assuming X is the input that needs to be buffered, MultiOutputSegBufferExecStream
is an extension of SegBufferExecStream
, and each input into CartesianJoinExecStream
consumes the buffered data, the stream graph would look like the following:
CartesianJoinExecStream / \ MultiOutputSegBufferExecStream | X
The problem with this design though is that it does not handle the case where the consumer of the buffered data needs to do restart-open of the buffered data, as is the case for a cartesian product join. It's not possible to restart one of the outputs without restarting both. (Nested loop joins also potentially have this problem.)
The design we decided to go with instead separates the functionality into two new streams -- a writer and a reader version of SegBufferExecStream
. The writer stream will be named SegBufferWriterExecStream
, and the reader stream will be named SegBufferReaderExecStream
. Both new streams are subsets of the functionality currently in SegBufferExecStream
. SegBufferWriterExecStream
takes a single input and writes the input into a buffer, so it can be read by one or more SegBufferReaderExecStream
s. It writes as its output a single tuple containing the first pageId of the buffered data it has created. That pageId is written to one or more consumer SegBufferReaderExecStream
s. Each SegBufferReaderExecStream
takes that input, reads the buffered data generated by the corresponding SegBufferWriterExecStream
, and writes the buffered data it reads to its output. Having separate streams also makes the design cleaner.
For example, a simple stream graph with two SegBufferReaderExecStream
s will look like the following.
MergeExecStream / \ SegBufferReaderExecStream SegBufferReaderExecStream ^ ^ \ / \ / SegBufferWriterExecStream | writer's input
Another variation of this design that was also considered was to communicate the pageId of the buffered data via a dynamic parameter instead of through the dataflow between the two streams. As a result, there would be no explicit dataflow between the two streams and only an implicit dataflow so that the streams are connected. However, having implicit dataflows requires adding new code to the schedulers, and you no longer have the explicit dataflows that the scheduler can use to decide which consumer to execute next after the writer stream has finished buffering its input.
Since there is existing code in SegBufferExecStream
that can be reused by the new classes, the common code will be refactored into two new helper classes -- SegBufferReader
and SegBufferWriter
. Both helper classes will obviously be used by SegBufferExecStream
, while SegBufferWriterExecStream
only needs a SegBufferWriter
, and SegBufferReaderExecStream
only needs a SegBufferReader
.
One difference between the new classes and SegBufferExecStream
is that the latter has an option that allows the buffered data to be regenerated when the stream is reopened in restart mode. The stream has a multipass
parameter which needs to be set to false if those are the desired semantics. That option is not supported in the new streams because it may not be possible to destroy the buffered data when SegBufferWriterExecStream
is reopened in restart mode. The buffered data may still be in use by SegBufferReaderExecStream
s elsewhere in the stream graph. To support this, we would have to support multiple sets of active buffers, and keep track of which readers are using which buffers. Since the existing use cases don't need this functionality, to simplify things, we will not support it.
Scheduler Changes
As noted in the previous section, because we are not using implicit dataflows between the reader and writer streams, we do not need to modify the schedulers to handle implicit dataflows.
However, there is one change that is needed in DfsTreeExecStreamScheduler
. Currently, when determining which consumer stream to execute after a producer stream has executed, the scheduler looks for the first stream it finds that has an incoming dataflow that is a non-underflow state. However, what we want is to give lower priority to streams that have an empty state, as those are streams that have not explicitly pulled from their inputs. This makes a difference in some cases. By executing a stream with an empty incoming dataflow vs a stream that has data in its input stream, a hang will result in the following scenario. If the SegBufferReaderExecStream
that initiated the request is upstream from a producer that's a sink in the middle of the stream graph, e.g., a producer that's the input into a UDX, and it's not in the first outgoing dataflow from the SegBufferWriterExecStream
, then that initiating SegBufferReaderExecStream
will never get rescheduled. The SegBufferReaderExecStream
which did not explicitly request data will be executed instead.
To address this, SegBufferWriterExecStream
needs to first write to its outputs that are in an underflow state over those with an empty state. And DfsTreeExecStreamScheduler::findNextConsumer
needs to ignore empty dataflows if there are other available non-underflow dataflows.
The parallel scheduler (ParallelExecStreamScheduler
) does not need to be changed because it already schedules all non-underflow consumers to execute after a stream has produced.
Early Closes
Fennel supports early closes. This allows an execution stream to close all of its producers and their producers after it has finished some part of its execution. For example, ExternalSortExecStream
does an early close as soon as it has finished reading all of its input and is ready to start sorting the data. By closing certain producers early instead of waiting for the entire stream graph to be closed, resources associated with those early-closing producers can be freed up early.
Early closes are also necessary to make certain types of queries work correctly. For example, if the underlying data storage does not support page versioning, a self-insert statement like the following:
insert into tab select * from tab;
requires an early close after the select portion of the stream graph has finished processing all its data. That's because the select portion of the stream is holding a shared lock on the last page it has read. The insert portion of the stream graph may need to insert data into that same page and will wait until it can acquire exclusive access to the page. This results in the statement hanging. Doing an early close releases the shared lock and allows the insert to acquire the exclusive lock.
Since the SegBufferWriterExecStream
creates the buffered data that's read by the reader streams, it also makes sense for it to destroy it. However, it cannot destroy the data until all of its readers have finished executing. Ignoring early closes for the moment, that works ok because readers end up being closed before the corresponding writer and therefore, there will be no active readers of the buffered data when the SegBufferWriterExecStream
is closed.
Early closes, however, pose problems.
Consider the following stream, where the SBW is a SegBufferWriterExecStream
and the SBRs are corresponding SegBufferReaderExecStream
s.
SBW -----> SBR -- | \ | Y ----- Z | / / | X -- SBR -- | ^ | | |-------------------
If Y initiates an early close, because SBW is connected to the top SBR, that results in the SegBufferWriterExecStream
being closed, which results in the buffered data that's shared being destroyed, even though the bottom SegBufferReaderExecStream
may not have finished, or even started, reading the buffered data.
To address early closes, a dynamic parameter will be shared between a SegBufferWriterExecStream
and its corresponding SegBufferReaderExecStream
s. That parameter will be used as a reference counter to keep track of the number of active readers of the buffer generated by the SegBufferWriterExecStream
. The counter will be incremented whenever any of the SegBufferReaderExecStream
s are opened, and it will be decremented whenever they are closed. If the reference count is non-zero, when an attempt is made to early close a SegBufferWriterExecStream
, the early close on the SegBufferWriterExecStream
will be ignored. A new method canEarlyClose
will be added to ExecStream
that returns true by default. SegBufferWriterExecStream
will override the method and return a value based on the value of the reference counter.
New Dynamic Parameter Counters
To support dynamic parameters that are used as reference counters, the following methods will be added to the DynamicParamManager
class:
/** * Creates a new dynamic parameter that will be used as a counter. * Initializes the parameter value to 0. * * @param dynamicParamId unique ID of parameter within this manager; IDs * need not be contiguous, and must be assigned by some other authority * * @param failIfExists if true (the default) an assertion failure * will occur if dynamicParamId is already in use */ void createCounterParam( DynamicParamId dynamicParamId, bool failIfExists = true); /** * Increments a dynamic parameter that corresponds to a counter. * * @param dynamicParamId ID with which the counter parameter was created */ void incrementCounterParam(DynamicParamId dynamicParamId); /** * Decrements a dynamic parameter that corresponds to a counter. * * @param dynamicParamId ID with which the counter parameter was created */ void decrementCounterParam(DynamicParamId dynamicParamId);
Dynamic parameters that are created as counters are different from existing dynamic parameters in that the new increment and decrement methods can be called on them. The increment and decrement are atomic operations. The existing methods to read and set dynamic parameter values can also be used on them. Their values will be stored as 8-byte signed integers. Lastly, their ids co-exist with the ids of regular dynamic parameters. In other words, once you've assigned a dynamic parameter id to a counter parameter, you cannot use that same id for a different parameter, even if that other parameter is a non-counter.
Farrago Changes
Hep Changes
LucidDB uses Hep to optimize queries. Hep executes HepInstruction
s, which correspond to some collection of optimizer rules. A new type of instruction will be added to Hep, corresponding to rules that will only be applied on common relational subexpressions.
Common relational subexpressions will be detected by examining the query graph that Hep builds. Each vertex in the graph corresponds to a relational expression in the query. Directed edges connect the vertices together representing parent-child relationships between the different relational expressions. Any vertex in the graph that has more than one parent vertex is a common relational subexpression because it means that the subexpression appears in multiple places in the graph. Note, however, that if the same relational expression appears as multiple inputs into the same parent, there is only a single edge from the parent vertex to the child vertex. So, simply counting the number of edges into a vertex is not sufficient.
A new abstract base class named CommonRelSubExprRule
will be created. The rule will extend from RelOptRule
. Any rules that potentially can be applied on common relational subexpressions will derive from this new base class.
Hep will be extended so the new instruction can be added to the appropriate places in a HepProgram
. When the new instruction is invoked, Hep will attempt to pattern match any CommonRelSubExprRule
s against subgraphs where the root vertex is a common relational subexpression. Once a match is found, Hep will pass along a list of the parent vertices corresponding to the common relational subexpression to the matching rule. The CommonRelSubExprRule
can use this additional information in its decision making logic. For example, from the list of parent vertices, it can determine how many times the common relational subexpression is used and use that number in some cost calculation.
Converting the Common Relational Subexpressions
A new rule named BufferCommonRelSubExprRule
will derive from CommonRelSubExprRule
. The rule simply matches on any RelNode
.
Based on the number of times the common relational subexpression is used (N), the rule will compare the cost of executing the common relational subexpression N times versus the cost of executing it once, buffering the result, and reading the buffered result N times. If the rule determines that buffering is beneficial, it will place a FennelMultiUseBufferRel
on top of the original relational expression. The new class FennelMultiUseBufferRel
will be derived from the existing FennelBufferRel
class.
To avoid an infinite loop, the rule is a no-op if the matching RelNode
is already a FennelMultiUseBufferRel
.
Note that when the FennelMultiUseBufferRel
is created, there is no distinction yet between whether the FennelMultiUseBufferRel
is being used as a reader or writer. That distinction will not be made until either FennelMultiUseBufferRel.toStreamDef
or FennelMultiUseBufferRel.implementFennelChild
is called.
For each FennelMultiUseBufferRel
object, we want to create a single writer stream and one reader stream for each reference to that object in the stream graph. The child input into the FennelMultiUseBufferRel
will only feed into the writer stream. That means then that we only want to call the method pairs FennelMultiUseBufferRel.visitFennelChild
and FennelMultiUseBufferRel.implementFennelChild
once for the child input of a FennelMultiUseBufferRel
object. Thus, those calls should only correspond to a writer instance of a streamDef. Note that when a stream graph has a mix of Fennel and Java RelNode
s, there need to be matching calls to visitFennelChild
and implementFennelChild
. Otherwise, if implementFennelChild
is called, but there is no corresponding visitFennelChild
call, this will result in dangling FarragoTransformDef
s.
It might seem like doing the extra book-keeping necessary to avoid the extra calls is superfluous, and we can always call visitFennelChild/implementFennelChild multiple times for a FennelMultiUseBufferRel
object and simply throw away the results. However, consider the following query tree, where the "B"s are the buffering nodes.
Z / \ Y B1 / \ B2 B2 | | X X | | B1 B1
If we were to call implementFennelChild
on every node, but throw away the unneeded results from the visitFennelChild
calls, we would end up with the following stream graph. The "R"s are the reader streams, and the "W"s are the writer streams.
Z / \ Y \ / \ \ R2 R2 \ \ / \ W2 | | | X X | | | | R1 R1 R1 \ / | W1-----|
Note that the rightmost X ends up producing data to nowhere. That's because when visitFennelChild
was called on the rightmost B2, that resulted in the rightmost X-R1 stream being created. Since W1 is the writer stream for B1, a dataflow is added from W1 to all of the "R1"s, resulting in the dangling streamDef.
Deciding which FennelMultiUseBufferRel
instance should be assigned the role of writer will be based on the instance that calls either implementFennelChild
or toStreamDef
first. Both calls need to be taken into account. If we make the assignment based solely on only one of the two calls, then that doesn't work for a case like the following. Again, the "B"s are the FennelMultiUseBufferRel
s, and the "IFC"s are IteratorToFennelConverter
s.
-- X -- / \ B Y | / \ IFC1 IFC2 Z | B | IFC1
If we assign the writer based on the instance that calls toStreamDef
first, then the lower B, which is a child of a IteratorToFennelConverter
, ends up calling toStreamDef
first, and is therefore assigned the writer role. However, implementFennelChild
was already called on the topmost B. Thus, when converting IFC2 to a streamDef, we end up with mismatched RelPathEntry
s because we only have the RelPathEntry
list corresponding to the topmost B, rather than the desired lower one.
Doing the reverse also doesn't solve the problem. If we assign the writer based on the instance that calls implementFennelChild
first, then the topmost B is the writer, but because toStreamDef
ends up being called first on the lower B, we again end up with a mismatch.
To properly assign the writer role, we'll need to extend FarragoRelImplementor
so it maintains two new maps:
- A map from a
RelNode
to itsRelPathEntry
list at the time of the first call to eithertoStreamDef
orimplementFennelChild
. Once the first call has been made, subsequent calls to those methods will not modify the map. - A map from a
RelNode
to the list ofFemExecutionStreamDef
s that it has been converted to.
This first map will be maintained as part of a new interface method that will be added to FennelRelImplementor
-- isFirstTranslationInstance
. FennelMultiUseBufferRel
will call this new method in both its toStreamDef
and implementFennelChild
methods. If the call returns true, then we know that the current instance is the writer instance.
The second map will be maintained as part of the existing call to FennelRelImplementor.registerRelStreamDef
. A new method FennelRelImplementor.getRegisteredStreamDefs(RelNode)
will be added to retrieve the mapping.
In FennelMultiUseBufferRel.implementFennelChild
, if the instance is not the writer, then implementFennelChild
is not called on the child. In FennelMultiUseBufferRel.toStreamDef
, if the instance is the writer, then both a writer and reader stream will be created. Otherwise, only a reader stream will be created. If a writer stream hasn't yet been created when a reader stream is created, we'll have to defer setting the incoming dataflows and dynamic parameter for the stream until later when we have the writer stream available. That's why we need to keep track of all of the streams that have already been created for the FennelMultiUseBufferRel
object. Once we've created the writer stream, then we will need to go back and create the dataflows from the newly created writer to those previously created readers.
Also, only if the current instance has created a writer will we call visitFennelChild
on the child input and create a dataflow from the input to the writer.
Applying the Common Relational Subexpression Rule
LucidDB's Hep program will be to be extended to add the new instruction for common relational subexpression rules. It would seem like for simplicity, we could just apply the rule once late in the Hep program to allow other optimizer rules to do all the necessary conversions to create the full range of common relational subexpressions. For example, it would not make sense to apply the instruction before RemoveDistinctAggregateRule
is called because the common relational subexpression corresponding to the aggregate inputs have not been created yet.
However, applying the rule too late also does not work. To see why, study the explain outputs from the use case section. For the aggregation example, look at the LcsRowScanRel
s that scan on SALES. Notice that they have different projections. The first is [[1,4]] while the second is only [[1]]. As a result, the two relational expressions corresponding to those joins are not the same.
The problem also occurs in the semijoin example. Note that the projections in the row scans on CUSTOMER are also different. The one used as part of the semijoin only projects a single column while the one that's part of the join projects all columns.
To ensure that there are common relational subexpressions that will allow count-distinct aggregations and semijoins to be optimized, the common relational subexpression instruction needs to be applied according to the following criteria:
- The new rule needs to be applied after the rules that add the common relational subexpressions (
RemoveDistinctAggregateRule
andLcsIndexSemiJoinRule
) have been applied. - After the common relational subexpressions have been introduced and before the new rule is applied, there cannot be any pushdowns of projections that will result in eliminating the common relational subexpressions.
- The new rule needs to be applied after rules that remove joins (
LoptOptimizeJoinRule
andLoptRemoveSelfJoinRule
) are applied. Otherwise, if there are common relational subexpressions that participate in joins that are removed by either of these rules, then we could end up buffering a relational subexpression that ends up only being used once in the final query tree. Or worse, the buffering could prevent the join from being removed. - The new rule needs to be applied after filters have been pushed down. Otherwise, a common relational subexpression corresponding to a cartesian product join could be buffered.
To satisfy the above criteria, the following will be done:
- We'll add the new instruction call once -- after semijoin-related rules are applied. Since semijoin-related rules are applied after the join ordering and removal rules, that satisfies criteria 3 and 4.
- Move the application of
RemoveDistinctAggregateRule
so it's done later, but before the join ordering rules are applied, sinceRemoveDistinctAggregateRule
can introduce a new join. This also needs to be done so criteria 2 is met. Currently,RemoveDistinctAggregateRule
is applied very early in LucidDB's Hep program. In order to meet criteria 2, had we left the rule where it currently is, we would have had to add a second instruction call to locate common relational subexpressions early in the Hep program. Doing so, however, would violate rules 3 and 4.
These changes will address both use cases. In the future, as additional use cases for materializing common relational subexpressions are identified, if the existing instruction call is insufficient to detect those common relational subexpressions, then further tweaking of the instruction calls will be required.
Unit Tests
Fennel Unit Tests
New tests will be added to ExecStreamTestSuite
to exercise the new Fennel classes. We can use the following simple stream graphs in the test cases.
MergeExecStream / \ SegBufferReaderExecStream SegBufferReaderExecStream ^ ^ \ / SegWriterBufferExecStream | MockProducerExecStream CartesianJoinExecStream / \ SegBufferReaderExecStream SegBufferReaderExecStream ^ ^ \ / SegWriterBufferExecStream | MockProducerExecStream
The stream graph with CartesianJoinExecStream
will exercise restart-opens on the SegBufferReaderExecStream
. The test cases will be added to both ExecStreamTestSuite
and ParallelExecStreamSchedulerTest
.
By adding SegBufferExecStream
s on top of the SegBufferReaderExecStream
s, that will exercise early closes, since a SegBufferExecStream
closes its producer after it has read all of its input.
Farrago Unit Tests
A new test case will be added to FarragoOptRulesTest
to exercise the new common relational subexpression instruction and rule.
Existing SQL tests should exercise common relational subexpression materialization for count-distinct aggregates and semijoins. Additional SQL statements will be added if the existing tests don't provide sufficient coverage.
Performance Results
As an initial test to measure the benefits of this change, I started off using TPC-H queries with 1G and 10GB dataset sizes. Queries 2, 10, and 16 will utilize the optimization for semijoins. Query 5 potentially could as well, but based on costing, the optimizer rule rejected the buffering. None of the TPC-H queries have both a count-distinct and regular aggregate. However, query 16 has a count-distinct aggregate, so I modified query 16 to include SUM(P_RETAILPRICE). That's noted as query 16a in the table below. As a result, query 16a uses buffering for both semijoins and count-distinct. To isolate the benefits of using buffering for only count-distinct, I modified query 16a to remove the IN filter on the PART table and the GROUP BY. Doing so prevents the query from using semijoins. That's noted as query 16b.
Of the 3 original queries, query 16 showed visible improvement for both dataset sizes. The two modified queries, 16a and 16b, also showed improvement for both dataset sizes. Query 10 showed improvement with a 10GB dataset size. The other cases showed no or marginal improvement. As you would expect, the cases where there's greatest improvement are those where the cost of executing the common relational subexpression is most expensive.
Query # | 1 GB Dataset | 10 GB Dataset | ||||
Without Buffering | With Buffering | % Improvement | Without Buffering | With Buffering | % Improvement | |
2 | 9.1s | 8.7s | 4.4% | 24.6s | 25.6s | - |
10 | 18.4s | 18s | 2.2% | 110.3s | 94.2s | 14.6% |
16 | 13.5s | 10.9s | 19.3% | 61.3s | 38.8s | 36.7% |
16a | 21.2s | 13.8s | 34.9% | 95.4s | 53.6s | 43.8% |
16b | 15.6s | 11.7s | 25% | 79.8s | 48.2s | 39.6% |
These tests were run on the same hardware noted here. In the 1 GB case, the default buffer pool size of 5000 pages was used. In the 10 GB case, it was increased to 49,000 pages. Each query was run in its own standalone LucidDB server instance to prevent any previously cached data from skewing the results.
Modified Queries
Query 16a:
SELECT P_BRAND, P_TYPE, P_SIZE, COUNT(DISTINCT PS_SUPPKEY) AS SUPPLIER_CNT, SUM(P_RETAILPRICE) FROM TPCH.PARTSUPP, TPCH.PART WHERE P_PARTKEY = PS_PARTKEY AND P_BRAND <> 'Brand#45' AND P_TYPE NOT LIKE 'MEDIUM POLISHED%' AND P_SIZE IN (49, 14, 23, 45, 19, 3, 36, 9) AND PS_SUPPKEY NOT IN ( SELECT S_SUPPKEY FROM TPCH.SUPPLIER WHERE S_COMMENT LIKE '%Customer%Complaints%') GROUP BY P_BRAND, P_TYPE, P_SIZE ORDER BY SUPPLIER_CNT DESC, P_BRAND, P_TYPE, P_SIZE;
Query 16b:
SELECT P_BRAND, P_TYPE, P_SIZE, COUNT(DISTINCT PS_SUPPKEY) AS SUPPLIER_CNT, SUM(P_RETAILPRICE) FROM TPCH.PARTSUPP, TPCH.PART WHERE P_PARTKEY = PS_PARTKEY AND P_BRAND <> 'Brand#45' AND P_TYPE NOT LIKE 'MEDIUM POLISHED%' AND PS_SUPPKEY NOT IN ( SELECT S_SUPPKEY FROM TPCH.SUPPLIER WHERE S_COMMENT LIKE '%Customer%Complaints%');