Reporting in High-Volume Service-Based Applications
By Matthew Rapaport
I recently completed a 10 month contract with an early
innovator in mobile service-based application delivery. “
While such services have been available in many versions now for some time, this company’s services were unique in that they all originated in coordinated data centers. The service preserved a “user’s state” across all applications (insuring the destruction or loss of a particular telephone would NOT cause loss of data for the user), and supported true multitasking across applications. A user could talk on the phone while downloading a music video, using instant messenger and searching the web all at the same time. All of these services were the product of some 4,000 servers running in 2 data centers handling transactions for over 1.5 million users often with a million or more connected to the service simultaneously.
The challenge of reporting in such environments is that you cannot simply store what comes from the service to a “reporting database” then find different uses for the data after the fact. While the techology exists for such massive storage and retrieval it is expensive, and especially so for a small company for whom reporting per-se is not a part of their core business. This company naturally focused its attention and support on services delivered to its subscribers. Reporting was important, but did not warrant costs and storage architectures that competed in size and expense with the service itself.
A given service was associated with many individual servers. For example, on any given day, 500 or more machines supported web services. Each machine (a blade server in a rack) produced a daily (for some services hourly) log file, and each identified the machine from which it came. A given user (the fundamental unit of reporting) might appear in many log records on a given machine and also on multiple machines as he or she used the service at various times throughout the day. The subscriber population, easily generated some 20 million log records a day not counting nonuser related “server monitoring” data.
Each log contained two basic record types. There were records that had to do with errors encountered by the service, and those about the user. The error-related data, processed in a separate workflow step, was not sent to the reporting database. As we will see, this is a principle strategy in dealing with large data volumes. In this case, the original designer made a mistake putting error-data parsing AFTER user-data parsing and redundantly iterating over the same raw-data. Much simpler and more efficient, would have been a filter layer through which the raw data passed being split into two separate data streams to and processed by two separate programs, each now acting on a much smaller data set.
The operational model – how the original reporting software ran -- was a good one, and we followed it in our redevelopment of the reporting software. The software ran asynchronously with multiple instances of itself and the data. The reporting team did not control the flow of raw data from the service machines to the block of servers that processed them. Data could be late, sometimes as many as several days, or some few logs might accidentally appear on the reporting servers more than once. I designed the new system with such asynchronicity in mind. Given log-types were directed at certain reporting servers, but servers could be added to the processing stream for any log type balancing daily variations in log volume. Similarly, any parser, running in the parsing cluster, handled any log and record type. The original parser (fig 1a) handled every processing step: reading in the raw logs, cleaning and transforming the data, and direct inserts to the DBMS by means of the , Perl DBI.
A file locking scheme assured that no two processes handled the same log simultaneously. The workflow and the parsing code allowed many parser instances to run over the same log types simultaneously. This flexibility led to certain issues at the DBMS layer. If the basic unit of reporting was the user, and multiple processes might be handling records for the same user at the same time, how would values in the DBMS merge properly? By defining the record keys to include the individual server, and consolidating the individual user log records for a given log in one totaled up record, all DBMS transactions became inserts. Accidental processing of duplicated logs would result in primary key violations wasting time, but otherwise doing no harm. This choice simplified DBMS administration. It worked, up to about a million users, when the whole collapsed of its own weight.
On its front end, loading a day’s worth of data began to need more than a day! Initially mitigated by adding machines to the parsing cluster, this resulted in overloading the DBMS. This then split into multiple instances, each separately loaded, a tactic that required a scheme for discovering in which database to find a particular table. Ultimately the back end, the programs that had to read all this data from the database, perform joins, unions, and subtractions on the tables to produce reports halted under the enormous load. Even with optimized tables the reports took too long, resulting in “snapshot too old” errors more and more often until few of the monthly reports ran at all.
The fundamental problem, thanks to the number of keys, was that “insert only” caused storage of somewhere between 5 and 20 times more records than necessary. What should have been a maximum of 300 million or so writes (users times core-tables) each day across the entire DBMS turned into 3 – 5 billion writes! On the back end, the reporting side, the code was trying to join on daily periods across these large tables spread across a dozen Oracle instances. Even segmented by day there were billions of records, and the broken reports were those having to sum days across a month!
I had the luxury of implementing two different solutions to this problem. Both followed a strategy of separating the data flow from raw log to DBMS table into multiple steps, each step handing the next a smaller data-set with which to work -- fig 2. The company’s architect dictated the first approach (fig 1b). It noted that data, stored chiefly at the user level, was totaled for reporting at the telephone carrier level. The approach promised enormous savings, on the order of two and three orders of magnitude in the number of daily writes to the DBMS. Unfortunately, while 75% of the reports did ignore individual user data, the other 25%, including all the histograms, needed it.
In keeping with the original version of the reporting service, the first implementation, a perl-only solution, involved 10 of the 20 core reporting tables. Characteristic of this group of reports was that each involved only ONE primary table. Also always involved in all reporting, a join with user-device and user-partner (partner being a particular telephone carrier) data obtained directly from a “service database”. The target table of this “user join”, loaded daily (the “day” being part of its key), resulted in 500 million new records a month in a high-traffic table used in every report!
I discovered that the user-partner relationship was fixed. A user ID once assigned to a partner, was never re-used. Furthermore, the user-device relationship, while changeable (a user could buy a new device), changed only rarely. I re-factored this bottle-neck table into two: a user-partner table containing fixed user-partner records, and a user-device table that added new records for a given user only when that user actually changed devices. This reduced writes to this table from 1.5 million each day to a few thousand, turning a “core table” into proper “reference tables”. Characteristically many queries use reference tables. For this reason, a good reference table is small, compared with the core tables facilitating the performance of joins involving them.
Having made this change, we systematically altered “where clauses” in all the reporting scripts to change the joins to use these new tables. The user-partner change was trivial, the user-device (needed in but a few reports) required some “latest date earlier than the report date” logic, but Oracle’s built-in functions made this easy as well. This change alone brought enough of the reports to life that operations were able to supply contractually obligated data while I finished the new software.
I designed a summary version (with day and partner as the highest level keys) of each of the core tables, preserving necessary detail, at the day and user level in 3 tables. One of these recording message counts, another time, and the third the simple presence of the user in services not tracking time or message counts. I divided the parsing into stages. Stage one summarized each log into a pair of files, a summary (facts summed up under day and partner) and one “detail file” containing correct user detail for the service type. The next stage summed up these “work in progress” files for a given log-type, adding server names and log-types to a table designed especially for preserving this data. Existing keys read from the DBMS enabled an insert or update decision for both the summary and detail data. A new “file locking and status” table prevented the re-use of any given log-file. Rewritten report queries took advantage of the new tables. So long as the queries produced the same sums and totals (designed into the new tables), the report code on the whole did not need many changes.
Performance gains at the report end were dramatic. The data for the reports themselves came either from the summary tables or the detail tables, never both in a given report. Those coming strictly from summary tables took mere seconds, a result of the 100 to 1000 to 1 decrease in total writes for these reports. Those coming from detail tables took longer, 2 to 4 hours, still well within performance specifications. Yet most of these latter gains were the result of the far smaller joins with the new primary reference tables.
The new software could not be as asynchronous as the original. Something had to track record changes to apply updates correctly. The totaling module read in a day’s worth of data from the DBMS, matching new data with the old by primary key. If a key combination existed in the DBMS an update occurred, otherwise an insert. Enabling simultaneous updates by multiple totaling modules required a “managing process”, undoing any gain achieved by parallelism. For a single module to meet performance specifications, the final data-set handed to the DBMS had to be small enough to write a day’s worth of a given log type in less than a day. In the end, the process needed about 4 hours to load an entire day for all log-types, but only by keeping the 3 detail tables small, a few months worth of reporting data at most. To facilitate this, I split the largest of the detail tables into two. Yet while the over-all system met its goals for record reduction, performance, and report reliability, it scaled only because the over-all records were down an order of magnitude, coupled with the use of properly designed reference tables.
Having finished the work described above I faced the second half of the log data, another 9 log and table types. The legacy reports involving these tables joined most of these core tables not only with the partner-reference table, but with one another! These made for impossibly large joins. This data, having to do with e-mail, included not only day, user, and machine keys, but also message ID’s resulting in dozens, sometimes hundreds, of daily records for a single user! We added a design goal of avoiding core-table joins. Accomplishing this meant joining the core tables on the input side of the over-all process creating specialized “reporting tables”. The architecture scaled even though each of the reporting tables contained user level data because none of the report queries needed any joins other than to small reference tables. Over-all record decrease was on the order of 10-to-1.
Because of the prejoin design, a “perl only” solution was out of the question as data joined in memory would be unmanageably large. We elected to turn what once were core tables into “staging tables” bulk-loaded using Oracle’s sqlldr -- SQL Loader. One reason for the relatively poor performance (on input) of the first summarization strategy was that Perl still loaded many millions of records into the DBMS. The Perl DBI is a wonderful tool, but slower than Oracle’s bulk-load tools, by a factor of 50 or more! Having loaded the staging tables, the work-flow control script would execute a set of PL/SQL procedures that performed the necessary joins and loaded the new “reporting tables”. This step, being oracle-to-oracle was also fast, minimizing the number of inserts to the DBMS by totaling data under a completely sorted primary-key-set before issuing a write. The system still supported updates (late data), but made the insert or update decision inside Oracle and the subsequent write (insert or update) was much faster than a Perl execute through the DBI.
This second summary strategy (fig 1c) proved effective. At the back end, reports rewritten to take advantage of the prejoined tables, performed in a consistent time of one to two hours. On the loading side we reduced the time needed to load a month of reporting data from six or seven days to one day, and this despite the initial-bulk load of a very-large number of records! But SQL Loader was not without its issues. Thousands of log files resulted in thousands of SQL Loader “logs” and other associated record collections which had to be automatically analyzed and discarded (usually) by the workflow. Oracle has more refined tools than SQL Loader. “External tables” would have allowed us to load the reporting tables from the cleaned logs directly from inside Oracle in one step, if the company’s over-all architecture had permitted their use. SQL Loader runs from anywhere on the network, while external tables must reside on the same server as the DBMS, something we could not arrange.
The over-all design also proved to be much smoother, less prone to operational difficulties. The totaling module in the first approach had very large arrays and was prone to memory leaks when handling many days of data. Operations found it necessary to break this particular design-feature and carefully control the number of days processed simultaneously. The SQL Loader/PL/SQL version did away with the large Perl data structures and avoided this problem.
The new reporting system succeeded in turning multiple terabytes of log data into a collection of some two dozen different reports selectable for different partners and user-devices. The final reporting database contained less than 10% of the original log data reducing storage cost and load time. The number of servers used to process logs and load the reporting database dropped from eight machines in the original system, to two! The key insights behind the process were reducing dataset size at each transformation step, and recognizing the final reporting records were essentially bibliographic in nature, something different from either a transactional or data-warehouse record architecture.
Transactional design minimizes (for a given key-set) data stored. Warehouse “star schema” separates each dimension of a query (the items of a “where clause”) from the “total-able facts” summed or counted, normally the content of a report. But a warehouse that supports improvised queries along many dimensions must, never-the-less, store a lot of data. Each record in a “facts collection” has a foreign key to every comparison dimension! Bibliographic records contain all their facts combined with their query dimensions as keys avoiding large joins altogether! This results in some data redundancy, but bibliographic records are only “reporting data”, not passed along as further input to some other process. Its only purpose from this point, is to be the “final source” for reports.
The tradeoff was reporting flexibility. The data collection supported only existing reports and a limited number of dimensions (partner and device in any combination). Each record in a “reporting table” carried all data necessary to support every report involving that particular table. In the original it was possible to answer ad hoc queries. A well designed data warehouse works to flatten out joins such that all types of queries have about the same “join cost”, but in the end all the data needed to support improvised queries has to be stored. By designing bibliographic reporting tables to support specific reports, we lost the query flexibility inherent in the original reporting system gaining in trade enormous reduction in data volume coupled with huge performance improvements.
Role of ETL tools:
The architect’s original design was faulty. It failed to account for the necessary retention of detail data coupled with a severe prejudice against Oracle’s bulk-load tool and PL/SQL. His preference was to use an ETL (extract, transform, load) or DI (data integration) tool such as IBM’s DataStage, Informatica, Pervasive, or others. Using such a tool would allow us to bypass the bulk-load plus PL/SQL steps while delivering insert/update performance more in line with these compared to using the Perl DBI. I have nothing against ETL products. But the company didn’t have one. They tend to be expensive both for the necessary capital outlay and in continuing maintenance cost compared to Perl (free) and DBMS tools -- free given the DBMS is already paid-for and maintained.
In general the cost of ETL software makes sense when large-volume data from multiple departments or business divisions is integrated into a central data warehouse. Because they are expensive companies that employ them tend, in my experience, to become obsessive about their use. Issues usually arise not because the tool cannot load the target DBMS, but because data sources are often unclean, inconsistent with one another, and may have unusual formats difficult for the tool to read. ETL tools are not usually good at analyzing and cleaning dirty data. They cannot automatically resolve inconsistencies. Today’s tools handle unusual formats through “application connectors” created using the tool’s own facilities. But the built-in facilities are often unable to encompass unusual formats. Staying “within the tool” often means overcoming this last problem with a relatively weak scripting language – typically Java Script.
By contrast, powerful scripting languages like Perl make short work of data cleaning, reconciliation, and canonization. A strategy that makes use of powerful scripting tools to clean data and ETL tools to load data (fig 3) provides the best of both worlds and often shortens over-all integration project development time by a factor of two or more.