MongoDB Engineering Blog

Posts on Engineering efforts, achievements, and culture at MongoDB.

Empowering Automation in Jira: Harnessing the Power of ScriptRunner

As a program manager and systems engineer respectively, the teams we serve use Jira, the popular project management tool, intimately and on a daily basis. We get many requests from our teams to tweak or improve processes that use Jira. While Jira's native automation capabilities are adequate for many instances, there are occasions where additional power and flexibility are required. In this article, we will explore one such case where we turned to ScriptRunner, a powerful plugin that allowed us to leverage the Groovy programming language and build a custom automation solution to match text against a predefined list of URLs. This example showcases the significance of automation for program managers, systems administrators, and engineering teams alike. The challenge One of the teams we support, documentation, noted that there was no good way to prioritize work on specific documentation pages, as each ticket's value seemed relatively equal. The team needed to be able to set priority and know the exact rank of a URL page based on a ranking list, while still maintaining the flexibility to change priority at will. In order to do this, the system would have to match text (specifically URLs) against a predetermined list of URLs and trigger specific actions based on the result. Leveraging Jira, ScriptRunner and Groovy Jira's built-in automation features can do plenty to provide the ability to streamline and simplify numerous processes: Workflow automation: Automate transitions, status updates, and field changes within Jira workflows based on predefined triggers or conditions. Notifications and alerts: Set up automated notifications and alerts to keep stakeholders informed about critical updates, changes, or upcoming deadlines. Issue creation and linking: Automatically generate new issues or establish associations between existing ones, saving time and ensuring data integrity. SLA management: Automate the monitoring and enforcement of service level agreements (SLAs), ensuring timely responses and resolution of issues. Custom rule creation: Design custom automation rules to suit your specific needs, allowing for tailored and efficient process management. While these capabilities are robust, our situation required complex customization and automation that exceeded the out-of-the-box functionality. How did we leverage Jira to get the outcomes we wanted? We discovered that by using ScriptRunner, we could leverage the Groovy programming language to write code and create a custom automation solution specifically as a listener within ScriptRunner. Luke crafted a listener that would monitor incoming tickets and match the provided text against our predefined list of URLs. Once the condition was met, the listener triggered the following actions: Change in priority status: The ticket's priority status was automatically updated based on the URL match, ensuring that the most critical issues received immediate attention. Label addition: A specific label was added to the ticket, providing additional context and enabling easy identification of relevant issues. siteRank: The rank of the highest priority URL’s rank was added into the siteRank custom field for folks to understand at a glance what number that ranked at. Check out Luke’s code in Groovy (with great assistance from Stack Overflow): // Defining the variable and initializing it with list of strings // def prioritySites = [ 'this space intentionally left blank', ] // Route Issue Created and Generic Events // def customFieldManager = ComponentAccessor.getCustomFieldManager() def issueManager = ComponentAccessor.getIssueManager() def issueIndexingService = ComponentAccessor.getComponent(IssueIndexingService) int p = 4 def finalM = 300 def intentionality = 0 try { def changeHistoryManager = ComponentAccessor.getChangeHistoryManager() def changeItems = changeHistoryManager.getAllChangeItems(issue) // Find this updates changeGroup def changeGroup = changeItems.last()['changeGroupId'] for (change in changeItems) { thisChangeGroupID = change['changeGroupId'] if (thisChangeGroupID == changeGroup) { if (change['field'] == 'URL(s)') { intentionality = 1 } } } } catch (Exception ex) { // New issues have no log to consider. log.warn('Bombed finding a changegroup, this is probably a new issue.') intentionality = 1 } def urlField = customFieldManager.getCustomFieldObjectByName('URL(s)') //' if (intentionality == 0) { log.warn('No change to URL(s) Field. Aborting.') return 0 } try { // The custom field 'urlField' *may* contain one or more URLs. // The issue's priority will be determined by the most-critical // URL, so split the field contents and iterate over all values def urls = event.issue.getCustomFieldValue(urlField).split() for (url in urls) { int index = prioritySites.findIndexOf { it == url } if (index < finalM && index > 0) { finalM = index } } if (finalM.toInteger() < 300 && finalM.toInteger() > 0) { // At-least one URL in "urlField" is high-priority. Add a label, // prioritize the issue accordingly // def labelManager = ComponentAccessor.getComponent(LabelManager) def currentUser = ComponentAccessor.jiraAuthenticationContext.loggedInUser def issue = event.issue as Issue def siteRank = customFieldManager.getCustomFieldObjectByName('siteRank') issue.setCustomFieldValue(siteRank, finalM.toString()) labelManager.addLabel(currentUser, issue.id, 'top250', false) if (intentionality == 1) { // Only set a priority if one was not included with the current change. if (finalM <= 150) { issue.setPriorityId(p.toString(2)) } else { issue.setPriorityId(p.toString(3)) } } // Update issue ComponentAccessor.issueManager.updateIssue(currentUser, issue, EventDispatchOption.DO_NOT_DISPATCH, false) boolean wasIndexing = ImportUtils.isIndexIssues() ImportUtils.setIndexIssues(true) log.warn("Reindex issue ${issue.key} ${issue.id}") issueIndexingService.reIndex(issueManager.getIssueObject(issue.id)) ImportUtils.setIndexIssues(wasIndexing) } else { // There is something in the urlField, but no high-priority URLs. // Remove the "top250" label and siteRank data // log.warn('No known URLs here') def labelManager = ComponentAccessor.getComponent(LabelManager) def currentUser = ComponentAccessor.jiraAuthenticationContext.loggedInUser def issue = event.issue as Issue // remove top250 // def labels = labelManager.getLabels(issue.id).collect { it.getLabel() } labels -= 'top250' labelManager.setLabels(currentUser, issue.id, labels.toSet(), false, false) // Empty "siteRank" field def siteRank = customFieldManager.getCustomFieldObjectByName('siteRank') issue.setCustomFieldValue(siteRank, null) if (intentionality == 0) { issue = event.issue as Issue issue.setPriorityId(p.toString(4)) } // Update issue ComponentAccessor.issueManager.updateIssue(currentUser, issue, EventDispatchOption.DO_NOT_DISPATCH, false) boolean wasIndexing = ImportUtils.isIndexIssues() ImportUtils.setIndexIssues(true) log.warn("Reindex issue ${issue.key} ${issue.id}") issueIndexingService.reIndex(issueManager.getIssueObject(issue.id)) ImportUtils.setIndexIssues(wasIndexing) } } catch (Exception ex) { log.warn('No URLs at-all here') def labelManager = ComponentAccessor.getComponent(LabelManager) def currentUser = ComponentAccessor.jiraAuthenticationContext.loggedInUser def issue = event.issue as Issue // remove top250 def labels = labelManager.getLabels(issue.id).collect { it.getLabel() } labels -= 'top250' labelManager.setLabels(currentUser, issue.id, labels.toSet(), false, false) // Empty "siteRank" field def siteRank = customFieldManager.getCustomFieldObjectByName('siteRank') issue.setCustomFieldValue(siteRank, null) issue = event.issue as Issue issue.setPriorityId(p.toString(4)) // Update issue ComponentAccessor.issueManager.updateIssue(currentUser, issue, EventDispatchOption.DO_NOT_DISPATCH, false) } The results With this automation in place, we now have a clear understanding of the impact each ticket holds. By identifying the top 250 Docs URL pages (which constitute 52.7% of our traffic!), we can prioritize them above other work that comes through the triage queue. The team can now determine at a glance which tickets involve these high-impact pages, enabling us to act promptly and efficiently to improve our most valuable content. Incorporating MongoDB's values of Build Together and Make it Matter into our automation project at Jira was instrumental in achieving a successful outcome. Embodying the value of Build Together, we fostered a collaborative environment by pairing in real-time to conduct extensive testing and in-depth discussions about the project's requirements. Making a tangible impact aligns perfectly with MongoDB's core value of Making it Matter, as we direct our efforts toward tasks that drive significant value for the organization and its users. This example highlights the importance of automation for program managers, systems administrators, and engineering teams alike, showcasing how tailored solutions can enhance productivity, accuracy, and overall efficiency within Jira. With the right tools and mindset, the possibilities for automation are vast, empowering teams to accomplish more in less time and with greater precision. While our use case is specific to matching URLs, one could imagine many other use cases, such as matching keywords, tags, and so on. Automation is of paramount importance for various stakeholders involved in the software development lifecycle. Here's why it matters: Enhanced productivity: Automating repetitive tasks frees up valuable time, enabling program managers, systems administrators, and engineering teams to focus on more strategic activities. Improved accuracy: Automation reduces the risk of human error and ensures consistent adherence to predefined processes and standards. Faster response times: By automating workflows and actions, teams can respond swiftly to critical issues and deliver faster resolutions. Scalability and consistency: Automation provides a scalable solution that maintains consistent outcomes, regardless of the number of processes or tickets involved. Transparency and visibility: Automated processes offer increased visibility into the status, progress, and performance of projects, enabling better decision-making. Our Jira automation project not only streamlined processes, it forged new partnerships and innovation. Through collaboration and knowledge sharing, we developed an automation solution that aligned precisely with our unique requirements. By prioritizing the most impactful pages in our documentation, we contribute meaningfully to the success and efficiency of our team's efforts. This project stands as a testament to the power of working together, embracing new challenges, and striving to make a real difference in everything we do at MongoDB.

August 14, 2023
Engineering Blog

Implementing an Online Parquet Shredder

In Atlas Data Federation ( ADF ) we’ve found that many customers want to take their data, say from an Atlas cluster, and emit to their own blob storage as a parquet file. This is a popular and–for many customers–important feature we’re proud to provide. Converting data from our native BSON format , complete with flexible-schema and row-based representation, to the fixed-schema, columnar parquet format without compromises is not a computationally easy task. We’ve provided this to customers for years now , but with our February 21st release we’re excited to announce the release of a much-improved BSON-to-Parquet writer. Among behind-the-scenes changes, one exciting improvement that will be immediately visible to customers is that the efficiency of our parquet-writer is substantively improved both for CPU and memory-usage. From our internal benchmarks we see a 2x improvement in throughput, and our production metrics indicate that about bears out for our customer workloads, of course with some variance. We have only ever seen throughput improvements; we have not found a single workload that has worse performance. In this blog post I’ll share a high-level view of the core technical challenge and how we built together at MongoDB to create this solution. The technical challenge If you use ADF and issue an aggregate that ends with an $out stage and the parquet format specified, we’ll take the result of that aggregation and emit it to a parquet file in the named blob storage. We can imagine an aggregate’s result as a stream of BSON documents. The role ADF plays, then, is to take that document-stream, convert it to a parquet file, and place it in the blob storage. Smoothing away the rough details, we’re left with the core technical challenge: We want to convert a stream of row-based, flexible-schema BSON documents into a columnar, fixed-schema parquet file. That’s quite a challenge! Our initial approach Our past approach understandably focused on stability and simplicity over everything else. We built out our initial parquet offering by leveraging existing, production-tested subcomponents. In broad strokes, it did the following: Scan through the entire document stream to build the global schema that describes them all. Open a parquet file with that schema, and then scan through the documents again, “shredding” their contents one-by-one into the parquet file’s columns. This two-pass algorithm worked and served our customers, but has two fundamental drawbacks. The most immediate drawback is exactly that it is two-pass, and the second pass has to entirely wait on the first: we’re unable to create a parquet file until we know the schema, and we can’t be certain of what the schema is until we’ve seen the last document (for example, the last document may introduce a new field). The second drawback is how, when we shred our documents, we’re placing them directly in the file. A consequence of this is that, absent significant re-engineering, it is very difficult to flush the contents to disk until the file is complete. 1 As we saw customers using $out-to-parquet more and more, we knew it would be worth the investment to shift to more performant machinery. Changes for our new approach Our new approach introduces what we call the Online Shredder . This algorithm enables us to effectively shred a stream of documents into a columnar format in a single pass, building the schema in parallel. It doesn’t shred immediately into a parquet file, but instead to an intermediate file that enables us to more easily swap memory to disk. Finally, we’re excited to leverage the apache-go parquet writer . We’ve found it to be an exceptionally performant and feature-rich library. Most of the rest of this blog post is a high-level description of the design and implementation of the online shredder that supports this new approach. The new approach in-depth We have a stream of documents, and we want to shred them into a columnar form. The overall challenge behind this operation is supporting MongoDB’s flexible schema and maintaining the right information for lossless parquet emission. In particular, the “language barrier” between MongoDB’s row-based, flexible schema and Parquet’s column-based, fixed schema introduces 3 hurdles: Missing or New Fields: One document may have a field, and then a later document doesn’t include that field. Or, we may have a new document that for the first-time-ever introduces a new field. Polymorphism: Two documents may have a field with the same namepath, but different types. Structural Metadata: Parquet enables reassembly of its records using metadata that encodes information derived from the global schema. With our online approach, we have to commit to this metadata without yet fully knowing the global schema. We’ll describe the design of the online shredder first by explaining how it builds a schema as it scans over the document. Then we’ll build off of those concepts and extend them to not just build the schema, but shred the documents into an in-memory columnar representation. We will see that the technical challenge in shredding comes from efficiently maintaining the structural metadata. The punchline will be that careful application of invariants from our schema-building gives us an elegant approach to maintaining this metadata. Introducing the schema Our document stream may contain documents that vary wildly in structure. For example, say the collection we want to emit as Parquet has these 4 documents {e: 5} {a: “s”, b: {c: “t”}} {} {a: 8, b: {c: “r”, d: true}} To put these 4 documents in the same file, parquet requires they all be described by the same schema . For our purposes we can understand a schema as a sort of meta-document: a document with key-value pairs, but with special meta-values. Concretely, here are the individual schemas for those four documents: The nested sub-document under b in the second and fourth documents illustrate how a schema for a document is recursively defined. Design: The schema illustrations above are quite close to our in-memory representation. Our schema is a tree: the blue nodes represent the names , mapping closely to the typical b.c way of referring to a field in a (nested) document. The square nodes maintain what types we’ve seen under that key-path. The schema of multiple documents is naturally defined by the union (or merge ) of each document’s individual schema. So the schema for our four-document collection is: A few highlights: Note that we don’t preserve the “frequency” of keys. Our schema doesn’t distinguish whether a field appears in every document, or just some documents. Merging schemas is commutative; we’ll always get the same schema no matter the order we visit the documents. The schema information is enough for us to tell the parquet file what columns it will need, so we can start laying it out. Roughly, each leaf (representing a scalar value in our schema, such a string or integer) will map to a column in the parquet file. Typically we’d name each column based on the names we see on the path from the root to that leaf. For instance, in this tree we’d have b.c and b.d as two column names. The atypical case is in supporting polymorphism. See how in our example the name a refers to two different parquet columns: one of type int32 , and another as type string. In these cases we disambiguate the columns by appending their type-name at the end. So the 5 columns from this schema will be named: a.int32 , a.string , b.c , b.d , e . 2 Code to build the schema We know we need our schema, and we know what it should look like. All that’s left is writing code to actually compute the schema. We can compute it inductively: Start with an empty schema On the next document, walk that document’s tree-structure and the schema-tree in lockstep. If there is ever a node in our document that is missing from our schema-tree, add that node. Repeat step-2 for the next document until done. At the end of each step-2, the schema we have built is the schema for all the documents-seen-so-far. Here is some pseudocode describing step-2 in more depth: The first parameter of each function is the document we’re walking, and the second parameter is the current schema-tree-node we’re in. The visitType function refers to the square nodes in our tree, and the visitName function to the round nodes. Note how on line 6 we are recursing into the individual key-value pairs of our document: that’s our actual tree-walk. To illustrate its implementation, here is how we’d iteratively build the schema over our four documents. Each image is the schema after step-2. We yellow-highlight the nodes to indicate those nodes we allocate, or visit, as we traversed the latest document. Note that this walk is driven by the document-size: even if our schema is ultimately very big, the time it takes to process a document is only dependent on the size of that document . When we shred document 3, we only visit the root, for instance. Hooray! This document-driven approach was our first step towards performance improvements in parquet-writing. Previously we had a more schema-driven approach that used more allocations and node-traversals. We’re ready to extend this foundation and shred our documents in the same pass. Before we jump in, let’s conclude with one crucial invariant that is apparent as a result of how we build our schema: Online schema invariant Once we place a leaf node in our tree, the path to that leaf never changes. In other words, we only ever place new leaves; we never insert a node “above” an existing one. We will rely on this invariant heavily to show that we’re correctly maintaining the structural metadata in the course of shredding. Introducing shredding We can now rest assured that, when the time comes to create a parquet file, we will have the right schema-information to tell it what columns to create. Of course, now we need to get the actual data to fill in those columns. Recall one of the key challenges we anticipated was maintaining the metadata. This metadata is what parquet needs to remember the structure of each document. These are called definition levels and repetition levels . Each column in parquet, in addition to its data-actual column, can have two more columns of this additional metadata. 3 Before exploring how our shredding builds and maintains this information, let’s take a brief detour to build an intuition about how this metadata works. Using definition and repetition levels Say we’ve naively shredded our 4-document stream without regard to any metadata. For visualization, we’ll attach the column-contents to each leaf in our schema. So after shredding our data looks like: This presents a challenge. Say one day we want to re-assemble our documents. What would our documents look like? We might very well guess the correct sequence, but absent additional metadata, we might as well reassemble this data into something like: {a: 5, b: {c: “s”, d: true}, e: 8} {a: r, b: {“t”}} Plainly these are very different from our original documents. That’s not good! So we need a way to disambiguate our shredded values; repetition and definition levels enable this in parquet. The full explanation can be found in the Dremel paper 4 (see also this very clear 2013 Twitter blog post 5 ). For brevity, here we’re just going to be looking at definition levels. Repetition levels are different of course, but similar enough where the mechanisms we’ll ultimately highlight will work the same way. We elide discussing repetition levels for brevity. Definition Levels: The i-th definition level in a column indicates how many names along the path to that column were present for document i 6 . This means as we process a document, we promise to write some definition-level for each leaf 7 . To introduce this idea, consider the leaf b.d. After processing all documents, it will have the definition levels "0,1,0,2". Here is how these b.d definition-levels are derived as we process our four documents. Now we can consider how our whole tree looks with all the definition levels. This picture tries to demonstrate a lot of the impact of definition-levels. Some highlights: Consider column e : It has definition levels 1, 0, 0, and 0. The 1 indicates that the first document ({e:5}) has 1 name defined along the path to e . Said another way, that first 1 indicates that the value in the column-store, 5 , is present in the first document. As e does not appear in any other documents, it has 0 for the rest of its definition-levels. We discussed column b.d , with def-levels 0, 1, 0, 2 above. We can compare it to its sibling, b.c . In particular, this highlights how the number of values a column has is exactly equal to the number of times the max-definition-level appears. The definition-level stream for b.d only has the max-definition-level, 2, once. It only has one value: true . Its sibling, however, has the max-definition-level, 2, twice, and correspondingly has two values. Columns a.string and a.int32 show that even though the name a appears in two document streams, we have to disambiguate the underlying data-type. This is saying that the second document contains a as a string, and the fourth document contains a as an int32. This is a detail that reveals some of the extra work we take to translate polymorphic documents into parquet-amenable formats. Hopefully this suggests a reassembly-into-documents algorithm: for the i-th document, we look at the i-th def-level of each column. When it’s equal to the number-of-names on our path, we pop off the next value in the value-stream associated with that column. There are more complications and subtleties in practice, but this captures the spirit of it. Code to shred the document stream We’ve covered how definition levels are used to reassemble the original documents. Now we can turn to our task of actually computing the definition levels as we shred our documents. Two challenges present themselves: We want to emit the correct definition-level as we process each document, but that definition level implicitly depends on the structure of the schema (it’s “how many names were on that path”). We’re building our schema on-line—how can we be confident that the definition level we write shouldn’t have been different as the schema grows to accommodate more documents? If the 10th document introduces some column named z , then we clearly haven’t been adding any definition levels to that column for the past 9 documents—we didn’t know the column existed yet! How might we “backfill” the required def-levels? It’s not enough to simply fill them with 0s, because (recall our 2nd example document, the b.d path) sometimes we need an intermediate def-value. The first challenge is easily addressed: recall the crucial Online Schema Invariant, that a path to a leaf never changes. A consequence of that invariant is that we can be confident in the definition-levels for any leaf. Its depth (i.e., the number of names to that leaf) is not going to change. Hooray! Hopefully as-presented this punchline is very clear and obvious to you; during the course of development, however, this result took some careful reasoning to establish. It is the second challenge that requires even more thought. Ultimately we found we could leverage the “lazy syncing” of def-levels the Dremel paper introduces. 8 We were already interested in using it to keep our per-document walk proportional to the document size, but it was a happy moment to see it could help us in the new-column case. Lazy syncing is mechanically simple but conceptually subtle, let’s dive in. Lazy syncing To set the stage for lazy syncing, we extend our definition-streams to interior (type-)nodes. In our example, that means our final shredding would look more like: See how the two document nodes now have definition-levels. The definition levels express the same idea: how many names did we see along this path for the i-th document. Let’s walk through our four-document example one more time, now shredding in addition to building the schema. Shredding document 1 This is the simple case. This is our first-ever document, so we know there’s no def-levels to “backfill”, and no nodes we’re trying to skip. We highlight the nodes where we’re adding definition-levels. Shredding document 2 Now things are getting interesting. We don’t highlight the e column because our algorithm doesn’t touch it for document 2. We do, however, inevitably touch one of its ancestors (in this case, the root). Syncing: When we create the a , b , c columns, how did we know to backfill the definition levels for the first document all with 0? This is because as we write the def-level of the current document, we first always check to make sure we’re in sync with our parent for any prior documents, and so on transitively. To narrate an example of syncing: When we created the a column, our code basically said “and your second document def-level is 1”. You might imagine the a column panicking–it never heard what its first document def-level is! What might you do when you panic? You might talk to your parent: in this case a ’s parent is the root, and a asks “what did you see for the first document?” — the root says 0, and a can trust that 0 is the deepest the first document ever got to a . For a more complicated case, see what happens with b and b.c . In that case, they both were written-to for the first time in this document. They were both initially skipped, so they both learn 0 from their parent (transitively), but of course b.c was written-to in this document, so it stops syncing at that latest document and writes a 2 instead of the 1 that b writes. We only sync a column when we write to it. In particular, see how we don’t sync the def-levels for e . This subtlety is key to enabling us to shred documents efficiently: when we shred a document, we only have to traverse the nodes in that document, not the (potentially much larger) number of nodes in the whole schema. This is syncing in-action. Let’s see a few more examples. Shredding document 3 This is a sort-of easy case: we hardly write anything. Instead of (for now) having to walk the whole schema, we’re only touching the root node—after all, we don’t have much more information to write for the empty-document. Implicitly, we’re relying on the syncing machinery to ultimately propagate this information as required. Shredding document 4 We finally ingest our last document. See in particular how the new-to-us column of b.d is able to sync the previous 3 def-levels from its parent, correctly learning the “1” for the second row. Almost without noticing, our syncing machinery gives us the ability to handle entirely-new columns: we simply create the new column, and then sync with its existing parent. The only thing we need to rely on is that we have some global ancestor that sees every document: but of course, that’s exactly the root, as we saw with Document 3. Finally, note also that this doesn’t quite look like the final form we expected: we ultimately do need 4 def-levels on every leaf. One last “global-sync”, where we propagate the defs down to the leaves until everyone’s caught up, finalizes our shredder-state into what we expect: Phew, the pseudocode Congratulations on getting through all that shredding. Hopefully this has been a thorough illustration of how this one key technology, syncing, enables us to extend our shredder to handle new columns as they appear online. While conceptually nontrivial, the (psuedo)-code extends easily from our schema-building example. To drive that home, the code-diffs are highlighted in magenta. It’s not that big of a diff to get full shredding! Concluding the implementation We are now able to take a stream of documents and shred those documents into an in-memory columnar format complete with their structural metadata. We only ever need to look at each document once for this process, leading to a substantive performance improvement. What we have not explored, and won’t in this blog post, are all the extra details we need to get this functional: swapping our in-memory representation to disk as-needed to bound our memory usage, actually emitting our in-memory representation into the physical Parquet file (thanks to the apache-go library), and more. Getting the new approach in production While the shredder algorithm is a fascinating component, it is just one part of the larger process of delivering product improvements. Any large deliverable involves many people, and anything that impacts our users calls for meaningful validation. We’ll conclude our discussion of shredding by placing its work and impact in this larger context. Build together This work benefited greatly from a number of collaborators. We were fortunate to be able to draw on a diverse range of experts and collaborators for this work, and doubly-so because so many of us were able to learn from each other along the way. A distinguished engineer pushed for this project, drawing her analytics expertise to show how we could have a much more column-first perspective on parquet emission, ultimately leading us to adopt the official apache arrow project as our parquet-file-writing library, interfaced with via the above-described shredder. She also outlined a lot of the key requirements for our swap-to-disk procedures, in addition to teaching me and other colleagues an enormous amount about the analytics world. At the cross-team level, we sought out performance-measurement experts to help characterize what performance success should look like at a systems level. At the individual collaboration level, I was glad for the privilege of introducing several junior colleagues to performance work and how to get effective measurements. Their contributions included many other technical deliverables that substantially improved our product’s performance, and insightful ways of measuring performance. Validating the behavior Finally, for all the technical and collaborative work we’ve discussed, the value of the work is predicated on us getting this into production. The popularity and importance of our parquet-writing motivated this work and made us excited to release it as soon as possible. By the same token, we needed to release this without compromising stability or correctness. That’s part of being a developer data platform . Along with table-stakes validation like targeted unit-tests validating the core machinery, integration tests validating how that machinery is used by the larger product, and end-to-end tests validating the, well, end-to-end experience, we sought out synthetic and real-world datasets to make sure we handled input of all shapes and sizes. We also strived to validate our project from a customer’s perspective: people emit parquet files with the intention of using them elsewhere, so we extensively tested our produced parquet files with third-party readers to assure their compatibility. Ultimately, the performance improvement brought on by this work is considerable. For our largest customers, our logging metrics indicate we’ve over doubled our parquet-writing throughput. This is consistent with our customer-oriented internal benchmarks, and the effect is that we are now saving many compute-days per day servicing our customer requests. We believe this work has laid a strong foundation for our parquet-emission support. Thanks for reading! 1 For those familiar with the parquet-specific term, we are able to flush on each rowgroup, not just the whole file–but the issue and challenge remains the same. We write “file” here for simplicity. 2 Note that a typename, such as “.int32” or “.string”, will only appear at the end of a path. 3 Not all columns require these streams, here we’re referring to the general case. For more details, see the Dremel paper, cited below. 4 https://dl.acm.org/doi/abs/10.14778/1920841.1920886 5 https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet 6 For those already familiar with parquet: technically the definition level range is only for names-that-could-possibly-be-missing, i.e., excluding fields marked REQUIRED. For our discussion and simplicity, we’re assuming all columns are OPTIONAL. 7 Things get a little more complicated when also considering repetition levels (multiple definition levels might map to one document), but again for brevity we do not discuss repetition levels. 8 Section 4.2, and Appendix A, of the Dremel paper.

April 11, 2023
Engineering Blog

Enhancing the .NET Development Experience with Roslyn Static Analysis

The MongoDB .NET/C# driver introduces idiomatic APIs for constructing queries and aggregations: LINQ and Builders . These APIs eliminate the need to write native MongoDB Query Language (MQL), but they also introduce some overhead when it comes to troubleshooting and optimizing the underlying MQL. Because the generated MQL cannot be inspected at compile time, troubleshooting queries involves outputting MQL at runtime and/or inspecting runtime exceptions. Given that MQL generation from a C# expression is basically transpiling, we knew that theoretically inferring the general form of MQL in compile time was solvable by static analysis. This realization, and the fact that the .NET ecosystem has an amazing framework for writing static analyzers ( Roslyn ), made me excited to try out this idea during MongoDB Skunkworks week . In this article, I will share my experience of forming a plan for this project, crafting a quick proof-of-concept during Skunkworks week, and eventually releasing the first public version . Skunkworks at MongoDB One of my favorite perks of working at MongoDB is that we get a whole week, twice a year, to focus on our own projects. This week is a great opportunity to meet and collaborate with other folks in the company, try out any ideas we want, or learn something new. I started my Skunkworks week by refreshing my Roslyn skills. While a week sounds like a fair amount of time for rapid prototyping, naturally I still had to settle on just a small subset of all the cool features that came to mind. I was lucky and, by the end of the Skunkworks, I had a MongoDB Analyzer for .NET prototype sufficient to demonstrate the feasibility of this idea. Roslyn analyzers A significant part of the .NET ecosystem is the open source .NET Compiler Platform SDK (Roslyn API). This SDK is well integrated into the .NET build pipeline and IDE (e.g., VS, Rider), which allows for the creation of tools for code analysis and generation. The Roslyn SDK exposes the standard compiler's building blocks. The main ones that will be used in the Analyzer project are: Abstract syntax tree (AST): Data structure representing the text of the analyzed code. Symbol table: Data structure that holds information about variables, methods, classes, interfaces, types, and other language elements. Each node in AST can have a corresponding symbol. Emit API: API that allows you to generate a new IL code dynamically and compile it to a memory assembly, which can be loaded and executed in the same application. Roslyn SDK provides a convenient API to develop and package a code analyzer, which can be easily integrated into a .NET project and executed as part of the build pipeline. Or, it can expose an interactive UI in an IDE, thereby enriching developers' experience and enforcing project-specific rules. Design approach The .NET.C# driver provides an API to render any LINQ or Builder expression to MQL. The next logical step is to identify the needed expressions and use the driver to extract the matching MQLs. Extracting the Builders or LINQ expression syntax nodes from the syntax tree provided by Roslyn was fairly straightforward. The next step, therefore, is to create a new syntax tree and add these expression syntax nodes combined with MQL generating syntax. Then, this new syntax tree is compiled into executable code, which is dynamically invoked to generate the MQL. To optimize this process, the Analyzer maintains a template syntax tree containing a sample MQL generation code from an expression: public class MQLGenerator { public static string RenderMQL() { var buildersDefinition = Builders<MqlGeneratorTemplateType>.Filter.Gt(p => p.Field, 10); return Renderer.Render(buildersDefinition); } } From this template, a new single syntax tree is produced for each Analyzer run, by dynamically adding the RenderMQL_N method for each analyzed expression N, and replacing the expression placeholder with the analyzed expression: public static string RenderMQL_1() { var buildersDefinition = AnalyzedBuildersExpression; return Renderer.Render(buildersDefinition); } Next, the compilation unit is created from the syntax tree containing all the analyzed expressions and emitted to in-memory assembly (Figure 1). This assembly is loaded into Analyzer AppDomain, from which the MQLGenerator object is instantiated, which provides the actual MQL by invoking RenderMQL_N methods. Figure 1: &nbsp; LINQ and Builder expressions extraction and MQL generation. This approach imposed four fundamental challenges, discussed below: Data types resolution: Expressions are strongly typed, while the types are usually custom types that are defined in the user code. Variables resolution: Expressions usually involve variables, constants, and external methods. The Analyzer cannot resolve those dependencies at compile time. Driver versions: Different driver versions might render different MQL. The exact driver version referenced by the analyzed code has to be used. Testing: The Roslyn out-of-the-box testing template lets you test analyzers on C# code provided as a simple string, which imposes significant maintainability challenges for a large number of tests. Data types resolution Given a simple LINQ expression that retrieves all the movies produced by Christopher Nolan from the movies collection: var moviesCollection = db.GetCollection<Movie>("movies").AsQueryable(); var movies = moviesCollection.Where(movie => movie.Producer == “Christopher Nolan”); The underlying Movie type, and all types Movie is dependent upon, must be ported into the Analyzer compilation space. All imported types must exactly reproduce the original namespaces hierarchy. Expressions like db.GetCollection<Movie> must be rewritten with fully qualified names to avoid naming collisions and namespace resolutions. For example, user code could contain Namspace1.Movie and Namespace2.Movie . An additional problem with importing the types directly is the unbounded complexity of methods and properties implementations, which in most cases could not be compiled in the Analyzer compilation space. This excess code plays no role in MQL generation and must not be imported into the compilation unit. We decided that an easier and cleaner solution was to create a unique type name for each referenced type under a single namespace. The Analyzer uses the semantic model to inspect the Movie type defined in the user’s code and creates a new MovieNew syntax node mirroring all Movie properties and fields. This process is repeated for each type referenced by Movie , including enums, arrays, collections (Figure 2). After creating a MovieNew type as a syntax declaration, the original LINQ expression must be rewritten to reference the new type. Therefore, the original expression is transformed to a new expression: db.GetCollection<MovieNew>("movies") . Figure 2: &nbsp;LINQ and Builder expressions extraction, data types resolution and MQL generation. Variables resolution In practice, LINQ and Builders expressions mostly reference variables as opposed to simple constants. For example: var movies = moviesCollection.Where(movie => movie.Title == movieName) At runtime, the movieName value is resolved, and MQL is generated with a constant value. For example, the above expression can result in the following MQL: aggregate([{ "$match" : { "Title" : "Dunkirk" } }]) This constant value is not available to Analyzer at compile time; therefore, we have to think of a workaround. Instead of presenting the constant, the Analyzer outputs the variable name: aggregate([{ "$match" : { "Title" : movieName } }]) As you can see, this technique does not produce a valid MQL. But, most importantly, it preserves the MQL shape and contains the referenced variable information. This is done by replacing each external variable and method reference in the original expression by a unique constant, and substituting it back in the resulting MQL (Figure 3). Figure 3: &nbsp; LINQ and Builder expressions extraction, constants remapping, data types resolution and MQL generation. Driver versions The naive approach would be to embed a fixed driver dependency into the Analyzer. However, this approach imposes some significant limitations, including: MQL accuracy degradation: Different versions of the driver can produce slightly different MQL due to bug fixes and/or new features. Backward compatibility: Expressions written with older driver versions might not be supported or result in different MQL. Forward compatibility: The Analyzer would not be able to process new expressions supported by newer driver versions. This issue can be resolved by releasing a new Analyzer version for each driver version, but ideally we wanted to avoid such development overhead. Luckily, instead of embedding a driver package with a fixed version into the Analyzer package, and limiting the Analyzer only to that specific driver version, Analyzer uses the actual driver package that is used by the user’s project and found on the user's machine. In this way, Analyzer is “driver-version agnostic” in some sense. One of the challenges was to dynamically resolve the correct driver version for each compilation, as C# dynamic compilation tries to resolve the dependencies from the current AppDomain. To solve this, Analyzer overrides the global AppDomain assembly resolution and loads the correct driver assemblies for each resolution request. An additional nuance was to load the correct .NET framework version. Usually, the Analyzer runs on a different .NET platform than the project's .NET target (e.g., Analyzer can run in VS on .NET Framework 4.7.2, while the analyzed project references the .NET Standard 2.1 driver). Luckily, all recent driver distributions contain the .NET Standard 2.0 version, which is supported by both .NET Core and .NET Framework platforms. The next step is to identify the physical location of .NET Standard 2.0 driver assemblies with the correct version (Figure 4). This approach allows the Analyzer to be driver-version agnostic, including supporting future driver versions regardless of the OS platform (e.g., Rider on Linux/Mac, VS on Mac/Windows, .NET build Linux/Mac/Windows). Figure 4: &nbsp; LINQ and Builder expressions extraction, constants remapping, data types resolution, driver version resolution and MQL generation. Testing Writing tests for such a project requires an unorthodox testing methodology as well. However, the Roslyn SDK provides a testing framework for writing integration tests. An integration test would receive a C# code snippet to be analyzed supplied as string and then execute the Analyzer on it. The default testing methodology introduces some inconveniences. For example, writing and maintaining hundreds of tests cases, with each test case testing multi-line C# code, involving complex data types as a usual string, without a compiler involves quite the overhead. Therefore, we extended the testing framework by creating a custom test runner in the following way. All the C# code for the integration tests is written as a standalone C# project, which is compiled in a standard way. Common underlying data types and other code elements are easily reused. An intended test method is marked by a custom attribute denoting the expected result. An additional test project references the former project and uses the reflection to identify the test cases denoted by special attributes. Then, it executes the Analyzer on the test cases’ C# files and the appropriate driver version and validates the results. For example, for LINQ expression .Where(u => u.Name.Trim() == "123") , we expect the Analyzer to produce a warning for LINQ2 and valid MQL for LINQ3. The test case is written in the following way: [NotSupportedLinq2("Supported in LINQ3 only: db.coll.Aggregate([{ \"$match\" : { \"Name\" : /^\\s*(?!\\s)123(?<!\\s)\\s*$/s } }])")] [MQLLinq3("db.coll.Aggregate([{ \"$match\" : { \"Name\" : /^\\s*(?!\\s)123(?<!\\s)\\s*$/s } }])")] public void String_methods_Trim() { _ = GetMongoQueryable() .Where(u => u.Name.Trim() == "123"); } The Analyzer testing framework parses the C# test cases project and creates a test case for each (DriverVersion, LinqProviderVersion, TestCase) combination (as shown in Figure 5): Figure 5: &nbsp; Test cases dynamically generated from C# code for each tested driver version discovered in Visual studio test explorer. This approach allows smooth integration with VS test runner and a seamless development experience. Besides significantly increasing the maintainability and readability, this approach also introduces a bonus feature. The test code project can be opened as a standalone solution (without the test framework), and the Analyzer output can be visually inspected for each test case as a user would see it. From initial idea to first release Because the Skunkworks project proved to be successful, the decision was made to develop a public first release. Generally, developing and releasing a greenfield product in most companies is a lengthy process, which involves resource allocation and planning, productizing, marketing, quality assurance, developing appropriate documentation, and support. In MongoDB, however, this process was incredibly fast. We formed a remote ad hoc team, across two continents, involving product management, documentation experts, developer relations, marketing specialists, and developers. Despite the fact that we were working together as a team for the first time, the collaboration level was amazing, and the high level of professionalism and motivation allowed everybody to do their part extremely efficiently with almost zero overhead. As a result, we developed and released a full working product, documentation, marketing materials, and support environment in less than three months. Learn more about our internal Skunkworks hackathon and some of the projects MongoDB engineers built this year.

November 17, 2022
Engineering Blog

Integrating Support for Non-Volatile Memory Into WiredTiger

Intel Optane DC Persistent Memory is a non-volatile memory (NVRAM) product that resembles both storage and memory and can be used as either. Like storage, Optane NVRAM retains data after a crash or power outage. Like memory, it sits on the memory bus and can be accessed by CPU using load/store instructions. In certain scenarios, its access latency even approaches that of dynamic random access memory (DRAM). At MongoDB, we have been thinking about how to use NVRAM in the storage engine. It can be seen as an extension of volatile DRAM, but a denser and a cheaper one. In pursuit of this goal, we extended our storage engine, WiredTiger , with a volatile NVRAM cache that retains frequently used file blocks. In this article, we share our experience, describe the lessons learned, and evaluate the costs and benefits of this approach. How to use NVRAM in the storage stack Optane NVRAM can act as both storage and memory. The persistent memory fabric itself can be packaged as a solid-state drive (SSD), as in Optane SSDs, or as a dual-inline memory module (DIMM) that looks almost like its DRAM counterpart and lives in the same type of slot on the motherboard. Even when NVRAM is packaged as a non-volatile DIMM (NVDIMM), we can ask the operating system to present it as a block device, put a file system on top, and use it just like regular storage. Broadly speaking, there are three ways to use NVRAM: As regular storage As persistent memory As an extension to volatile memory NVRAM as storage Using NVRAM as regular storage can deliver superior throughput (compared to SSD) for read-dominant workloads, but this approach hinders write-dominant workloads because of Optane NVRAM’s limited write throughput (see the section “Performance properties of Optane NVRAM”). In any case, both the price and density of NVRAM are closer to those of DRAM than to those of SSD, so using it as storage is not recommended. NVRAM as persistent memory Imagine that all your data structures live in memory and that you never have to worry about saving them to files. They are just there, even after you quit your application or if it suffers a crash. Although this setup sounds simple, in practice, it is still challenging to program for this model. If your system crashes and you would like to be able to find your data after restart, you need to name it. A variable name is not sufficient, because it is not unique; thus, you have to restructure your code to make sure your data has persistent identifiers. Persistent Memory Development Kit (PMDK) provides APIs for that. A more difficult problem is surviving a crash. Your program may crash in the middle of a logical operation on a data structure. For example, suppose you are inserting an item into a linked list, and you have set the source pointer, but the crash occurs before setting the destination pointer. Upon restart, you’ll end up with corrupted data. To make matters worse, even if the logical operation had completed before the crash, the data might have been written only to CPU caches but not persisted to the memory itself . One solution is to wrap memory operations in transactions; however, programming transactional memory is notoriously difficult. Another solution is to use prepackaged data structures and APIs, but if you are looking to create your own highly optimized data structures, you must implement your own logging and recovery or other mechanisms that protect your data similarly to transactions. NVRAM as an extension of volatile memory Somewhat counterintuitively, this option involves disregarding the persistence of NVRAM and using it as a volatile extension of DRAM. Why would you want to do that? Suppose you have a fixed budget to buy extra memory for your system. You can either afford N GB of DRAM or about M*N GB of NVRAM — that’s because NVRAM is denser and cheaper per byte than DRAM (about three times cheaper, at the time of writing). Depending on your application, you might be better off in terms of performance/$$ if you buy additional NVRAM, as opposed to DRAM. In support of this use case, Intel provides a hardware mechanism, called Memory Mode, which treats NVRAM as “regular” system memory and uses DRAM as its cache. In other words, the hardware will do its best to place frequently used data structures in DRAM, and the rest will reside in NVRAM. The beauty of this mechanism is that it requires absolutely no changes to applications. The downside is that it may perform worse than a custom solution for certain workloads (see section “How NVCache affects performance”). Our solution is a custom-built volatile cache that resides in NVRAM. Our architecture Our NVRAM cache (or NVCache) is a component of the MongoDB storage engine WiredTiger. For persistent storage, WiredTiger organizes data into blocks, where keys and values are efficiently encoded and (optionally) compressed and encrypted. For fast query of its B+-tree data structure, WiredTiger transforms blocks into pages, where keys/values are decoded and indexed. It keeps pages in its DRAM page cache. Figure 1. &nbsp;The architecture of NVCache. Figure 1 shows the architecture of NVCache. NVCache is the new component, and the rest are part of WiredTiger. NVCache sits next to the block manager, which is the code responsible for reading/writing the data from/to persistent storage. Let’s look at each path in turn. Read path: If the page cache cannot locate the searched-for data, it issues a read to the block manager (1). The block manager checks whether the block is present in NVCache (2), accesses it from NVCache if it is (3), and reads it from disk if it is not (4). The block manager then transforms the block into a page, decrypting and decompressing it if needed, and then hands it over to the page cache (5). It also notifies NVCache that it has read a new block, and NVCache then has the discretion to accept it (6). NVCache stores the blocks in the same format as they are stored on disk (e.g., compressed or encrypted if those configuration options were chosen). Write path: The write path differs from the read path in that WiredTiger does not modify disk blocks in place. It writes updates into in-memory data structures and then converts them into new pages, which would be sent to disk either during eviction from the page cache or during a checkpoint (7). When the block manager receives a new page, it converts it into a new block, writes the block to storage (8), and informs NVCache (9). NVCache then has the discretion to accept it. Obsolete blocks are eventually freed, at which time the block manager instructs NVCache to invalidate cached copies (10). To avoid running out of space, NVCache periodically evicts less-used blocks. The eviction thread runs once a second. Overall, this design is straightforward, but making it performant was a challenge. As expected with brand new storage or memory devices, the software must cater to their unique performance properties. In the next section, we focus on these performance features and explain how we adapted our cache to play along. Performance properties of Optane NVRAM In low-bandwidth scenarios, the access latency of Optane NVRAM approaches that of DRAM. A small read takes about 160 to 300 nanoseconds, depending on whether it is part of a sequential or a random access pattern1; a read from DRAM takes about 90 nanoseconds.3 Small writes are as fast as in DRAM3 because the data only has to reach the memory controller, where it will be automatically persisted in case of a power loss. In high-bandwidth scenarios, we usually look at throughput. Sequential read throughput is about 6 GB/s for a single NVDIMM 1,2 and scales linearly as you add more memory modules. (A single 2nd Generation Intel Xeon Scalable processor can support up to six NVDIMMs.) The write throughput is more limited: We observed up to 0.6 GB/s on a single NVDIMM2, and others observed up to 2.3 GB/s. 3 Again, if your workload writes to different NVDIMMs, the throughput will scale with the number of modules in your system. A somewhat troublesome observation about write throughput is that it scales negatively as you add more threads. Write throughput peaks at one or two concurrent threads and then drops as more threads are added. 2,3 More importantly, we were surprised to find that, on Optane NVRAM, the presence of writers disproportionately affects the throughput of readers. Figure 2. &nbsp;Read throughput in presence of concurrent writer threads. Figure 2 shows how the throughput of eight reader threads drops as more concurrent writers are added. Although this effect is present on both DRAM and NVRAM (and certainly on other storage devices), on Optane NVRAM, the effect is much more pronounced. Performance of reads will suffer in the presence of writes. This important observation drove the design of our NVCache. Throttling writes in caches for Optane NVRam For a cache to be useful, it must contain popular data. The duties of admitting fresh data and expunging the old fall on cache admission and eviction policies, respectively. Both admission and eviction generate writes, and, because writes hurt the performance of reads on Optane, admission and eviction will interfere with performance of cache retrievals (which involve reads). Thus, we have a trade-off: On one hand, admission and eviction are crucial to making the cache useful. On the other hand, the write operations that they generate will hamper the performance of data retrievals, thereby making cache less performant. To resolve this tension, we introduced the Overhead Bypass (OBP) metric, which is a ratio of reads and writes applied to the cache. Keeping this ratio under a threshold allowed us to limit the overhead of writes: OBP = (blocks_inserted + blocks_deleted) / blocks_looked_up Intuitively, blocks_looked_up correlates with the benefit of using the cache, whereas the sum of blocks_inserted and blocks_deleted correlates with the cost. NVCache throttles admission and eviction to keep this ratio under 10%. (Our source code is available in the WiredTiger public GitHub repository .) Without OBP, the sheer overhead of data admission and eviction was quite substantial. To measure this overhead in its purest form, we experimented with workloads that do not stand to benefit from any extra caching, such as those with small datasets that fit into the OS buffer cache (in DRAM) or those that perform so many writes that they quickly invalidate any cached data. We found that using NVCache without the OBP feature caused these workloads to run up to two times slower than without the cache. Introducing the OBP completely eliminated the overhead and enabled the workloads that stand to benefit from extra caching to enjoy better performance. How NVCache affects performance In this section, we’ll look in detail at the performance of workloads with large datasets that stand to benefit from an additional cache. Experimental system: The following experiments were performed on a Lenovo ThinkSystem SR360 with two Intel Xeon Gold 5218 CPUs. Each CPU has 16 hyper-threaded cores. The system has two Intel Optane persistent memory modules of 126 GB each. For storage, we used an Intel Optane P4800X SSD. We configured our system with only 32 GB of DRAM to make sure that extra memory in the form of NVRAM would be called for. We present the data with widely used YCSB benchmarks 4,5 (Table 1), although we also performed analysis with our in-house benchmarks and reached similar conclusions. Table 1. &nbsp; Characteristics of YCSB benchmarks The following charts compare the throughput of YCSB with NVCache, with Intel Memory Mode (MM), and with OpenCAS6 — a kernel implementation of NVRAM-resident cache from Intel. OpenCAS was configured in the write-around mode, which was the best option for limiting the harmful effect of writes.7 Figures 3a-c shows the data in configurations using 63 GB, 126 GB, and 252 GB of NVRAM, respectively. Figure 3. &nbsp; Throughput of YCSB under Memory Mode (MM), OpenCAS, and NVCache relative to running with DRAM only. We make the following three observations: OpenCAS cache delivers no performance benefit from extra NVRAM. It achieves a similar or better read hit rate as the NVCache but also makes two orders of magnitude more writes to NVRAM, probably because it does not throttle the rate of admission. Writes interfere with performance of reads, which is probably why this cache delivers no performance benefits. When the dataset size exceeds NVRAM capacity, NVCache provides substantially better performance than Memory Mode. As shown in Figure 3a, NVCache outperforms the memory mode by between 30% (for YCSB-B) and 169% (for YCSB-C). Furthermore, the memory mode hurts YCSB-A’s update throughput by about 18% relative to the DRAM-only baseline, while NVCache does not. Memory mode performs comparably to NVCache when NVRAM is ample. With 252 GB of NVRAM, all datasets comfortably fit into the NVRAM. Two factors explain why NVCache loses its edge over MM with ample NVRAM: (1) For NVCache, the marginal utility of additional NVRAM is small after 126 GB; NVCache hit rate grows by about 20% when we increase NVRAM size from 63 GB to 126 GB, but only by another 5% if we increase it from 126 GB to 252 GB. (2) While MM allows the kernel buffer cache to expand into NVRAM, NVCache confines it to DRAM, which is also used by the WiredTiger’s page cache. Contention for DRAM limits performance. Overall, the benefit of a custom NVRAM cache solution is that it provides better performance than the Memory Mode for large workloads. The disadvantage is that it requires new software, whereas MM can be used without any changes to applications. Performance and cost In this section, we explore the trade-offs of using Optane NVRAM as a volatile extension of DRAM versus just using more DRAM. To that end, we take a fixed memory budget of 96 GB and vary the fraction satisfied by DRAM and NVRAM as shown in Table 2. Table 2. &nbsp; Budget of memory configurations containing both DRAM and NVRAM relative to DRAM-only. We use the NVRAM-to-DRAM price ration of 0.38. 8 Figure 4. &nbsp; Performance per dollar as the amount of NVRAM increases and the amount of DRAM decreases (in YCSB workloads). Figure 4 shows the performance of YCSB under these configurations normalized to using 96 GB DRAM and divided by the cost ratio in column 3. In other words, these are performance/$ numbers relative to the DRAM-only configuration. In these experiments, we used only NVCache to manage NVRAM, as it performed comparably to or better than other options. Positive numbers mean that the performance decreased less than the memory cost. Read-only or read-mostly workloads that benefit from the NVCache experience a positive gain, as expected. Although in most cases performance predictably drops as the amount of DRAM decreases, YCSB-C in configuration with 64 GB NVRAM and 32 GB DRAM performs better than it does with 96 GB DRAM — so we decrease the system cost and improve performance in absolute terms. This occurs because beyond 32 GB of DRAM, the utility of additional memory (and a larger page cache) is considerably smaller than the loss in performance due to a smaller NVCache. YCSB-A, whose write intensity prevents it from deriving benefits of any additional caching, suffers the overall loss in terms of performance/$. Its performance drops at a steeper rate than the memory cost as we decrease the amount of DRAM. We conclude that NVRAM is a cost-effective method of reducing memory cost while balancing the impact on performance for read-dominant workloads. At the same time, even a modest presence of writes can render NVRAM unprofitable relative to DRAM. References J. Izraelevitz, et al. Basic Performance Measurements of the Intel Optane DC Persistent Memory Module . arXiv:1903.05714. We Replaced an SSD with Storage Class Memory. Here is What We Learned by Sasha Fedorova. The MongoDB Engineering Journal. Jian Yang, et al. An Empirical Guide to the Behavior and Use of Scalable Persistent Memory . USENIX File Access and Storage Conference (FAST 2020) . Yahoo! Cloud Serving Benchmark, Git Repo . B.F. Cooper, et al. Benchmarking Cloud Serving Systems with YCSB . SoCC '10: Proceedings of the 1st ACM Symposium on Cloud Computing . Open Cache Acceleration Software . Open CAS Linux — Admin Guide . H.T. Kassa, et al. Improving Performance of Flash Based Key-value Stores Using Storage Class Memory as a Volatile Memory Extension . USENIX Annual Technical Conference (USENIX ATC 21) .

July 25, 2022
Engineering Blog

We Replaced an SSD with Storage Class Memory; Here is What We Learned

On April 2, 2019 Intel Optane Persistent Memory became the first commercially available storage class memory (SCM) product. Like SSD, this memory is persistent, and like DRAM it sits on the memory bus. Long before a commercial release, system architects pondered how exactly SCM fits in the storage hierarchy, and now came an opportunity to perform concrete measurements. One question we wanted to answer is whether a storage device sitting on a memory bus can deliver better throughput than an equivalent storage device sitting on a PCI-e. There are two ways to access SCM: byte interface and block interface. Byte interface allows using load and store instructions, just like with DRAM. Block interface exposes SCM as a block device, optionally with a file system on top: this way it can be accessed just like a conventional SSD. The load/store API is streamlined, because nothing stands between the application and the hardware, but also tricky to use, because it does not come with features like crash consistency, the way file system API usually does. Accessing SCM via the block or file system API comes with OS overhead, but there is no need to rewrite applications. WiredTiger, MongoDB’s storage engine that we evaluated in this article, reads and writes data to/from storage using sizeable blocks (typically 4KB or larger). Besides being a necessity on conventional storage hardware today, using block API has other practical advantages. For example, compression and encryption, features that customers covet, are optimized to work with blocks. Similarly, checksums that safeguard from data corruption are computed on blocks of data. Furthermore, WiredTiger caches blocks of data in its DRAM-resident cache, which together with the OS buffer cache is a boon to performance. Block-orientedness and reliance on caching positioned WiredTiger, like many other storage engines, to effectively hide latency of slow storage devices . As a result, our experiments revealed that a modest latency advantage that SCM provides over a technology-equivalent SSD does not translate into performance advantages for realistic workloads. The storage engine effectively masks these latency differences. SCM will shine when it is used for latency-sensitive operations that cannot be hidden with batching and caching, such as logging. In the rest of the article we detail the results of our experiments that lead us to make this conclusion. Experimental Platform We experimented with two storage devices: Intel Optane DC Persistent Memory (PM) and Intel Optane P4800X SSD . Both are built with the Intel Optane 3D XPoint non-volatile memory, but the former is a SCM that sits on the memory bus while the latter is a PCIe-attached SSD. Microbenchmarks To begin with, we gauged raw device bandwidth with microbenchmarks that read or write a 32GB file using 8KB blocks. We vary the number of threads simultaneously accessing the file, each its own chunk. A file can be accessed either via system calls (read/write) or mmap; the latter method usually has less overhead . SSD drive's raw performance meets the spec. According to the spec , our P48000X drive is capable of up to 2.5GB/s sequential read bandwidth and up to 2.2GB/s sequential write bandwidth. Here are the numbers we observed via the Ubuntu Linux (5.3 kernel) raw device API, meaning that the data bypasses the OS buffer cache. Raw SSD performance, sequential reads and writes The read bandwidth behaves according to the specification as long as we use at least two threads. The write bandwidth, unexpectedly, exceeds its specified upper bound when using multiple threads. We suspect that this could be due to buffering writes either at the OS or at the device. The Optane P4800X SSD is faster than a typical SSD at the time of this writing, but not to the point of being incomparable. While the Optane SSD offers up to a 2.5GB/s of sequential read bandwidth, a typical NAND SSD (e.g., Intel SSD Pro 6000p series) offers up to 1.8GB/s. The difference is more noticeable with writes. The Optane drive can deliver up to 2.2 GB/s, while the NAND drive can do no more than 0.56 GB/s. Random reads and writes on the Optane SSD are not that much worse than sequential ones. We are able to achieve (using mmap) close to peak sequential throughput with reads and only 10% short of peak sequential throughput for writes. Near-identical performance of sequential and random access is a known feature of these drives . SCM offers high-bandwidth reads, but low-bandwidth writes Now let us look at the raw performance of SCM. Intel systems supporting Optane PM can fit up to six DIMMs; our experimental system had only two. We measured the throughput on a single DIMM and two DIMMs used together, to extrapolate scaling as the number of DIMMs increases. We also relied on data from other researchers to confirm our extrapolation. There are two ways to obtain direct access to PM: (1) devdax — a PM module is exposed as a character device and (2) fsdax — in this case we have a file system on top of a PM module masquerading as a block device, but file accesses bypass the buffer cache via the Direct Access (DAX) mode. In our experiments we used the ext4 file system. The following chart shows the throughput of sequential reads and writes obtained via these access methods. In all cases we use mmap, because that is the only method supported by devdax. Sequential read bandwidth of a single PM module reaches about 6.4 GB/s; that matches observations of other researchers . Random access behaves almost identically to sequential access, so we omit the charts. Storage class memory, sequential reads, single PMEM device. Storage class memory, sequential writes, single PMEM device. Write experiments tell a different story. The single-module write throughput achieves a mere 0.6 GB/s. This measurement does not agree with the data from the UCSD researchers who observed around 2.3GB/s write bandwidth on a single device . Further investigation led us to believe that this was due to differences in hardware versions. That said, our observations reveal that a single PM module achieves write throughput comparable only to a NAND SSD. Next, let’s look at scaling across two devices. The following figure shows the measurements for sequential reads and writes, using mmap over fsdax. We used the Linux striped device mapper to spread the load across two DIMMs. For reads, with two DIMMs we can almost double the peak read bandwidth, from 6.4 GB/s with one DIMM to 12.4 GB/s with two. Similarly, researchers at UCSD observed nearly linear scaling across six DIMMs . Storage class memory, sequential reads, comparison between one and two PMEM devices. Storage class memory, sequential writes, comparison between one and two PMEM devices. For writes, we achieve nearly 1 GB/s of write throughput with two DIMMs relative to 0.6 GB/s with one, but the scaling is less than linear if we can extrapolate from a single data point. The USCD researchers observed that bandwidth with six DIMMs improved by 5.6x relative to using a single DIMM, which is in line with our observation. Extrapolating from these data points, if our system had six DIMMs, we’d observe around 3.4 GB/s of peak write bandwidth, which is about 50% better than the Optane SSD. In summary, with bare device access we see about 2.5 GB/s of peak read bandwidth on the Optane SSD and about 6.4 GB/s on a single Optane PM module. With six modules, the throughput would be ~38GB/s. Write throughput was only 0.6 GB/s on a single PM module, projected to reach 3.4 GB/s with six, while the Optane SSD reached 2.2 GB/s write bandwidth. Optane SCM has a significant edge over the SSD with respect to reads, and a small advantage in writes, provided you can afford six PM modules; otherwise, an SSD will deliver a higher write throughput. Software caching attenuates SCM performance advantage While SCM is closer to the speed of DRAM than traditional storage media, DRAM is still faster, so advantages of DRAM caching are difficult to overlook. The following charts shows that with the buffer cache on (here I am using ext4 without the DAX option), all devices perform roughly the same, regardless of whether we are doing reads or writes, random or sequential access. These experiments were done with the warm buffer cache, i.e., the file was already in the buffer cache before I began the experiment, so here we are measuring pure DRAM performance. With access to data taking less time, software overheads become more evident, which is why mmap is much faster than system calls if we use eight or more threads. Sequential reads on SSD and SCM with a warm buffer cache. Random reads on SSD and SCM with a warm buffer cache. Sequential writes on SSD and SCM with a warm buffer cache. Random writes on SSD and SCM with a warm buffer cache. If we begin each experiment with a cold buffer cache, the difference between the devices is still visible, but less apparent than if we bypass the buffer cache altogether. With cold buffer cache, on the read path the OS has to copy the data from the storage device into the buffer cache before making it available to the application (hence extra software overhead). Furthermore, with buffer cache on, the OS is not using huge pages. These factors dampen the raw read advantage of SCM. For writes, whereas SCM used to deliver lower bandwidth than SSD with raw access, now SCM outpaces SSD, likely because the buffer cache absorbs and batches some of them, instead of flushing each write to the device immediately. Sequential reads on SSD and SCM with a cold buffer cache. Random reads on SSD and SCM with a cold buffer cache. Sequential writes on SSD and SCM with a cold buffer cache. Random writes on SSD and SCM with a cold buffer cache. Experiments with the storage engine Like most storage engines, WiredTiger was designed and tuned to leverage a DRAM cache. Both WiredTiger internal cache and the OS buffer cache are crucial for performance in all workloads we measured. Running WiredTiger without the OS buffer cache (fsdax mode) reduced its performance by up to 30x in our experiments; hence, we did not use the direct-access mode. We run the WiredTiger’s wtperf benchmark suite, which was designed to stress various parts of the system and emulate typical workloads observed in practice. WiredTiger internal cache size varies between a few to a hundred gigabytes across the benchmarks, and most benchmarks use at least a dozen threads. (Detailed configuration parameters for all benchmarks can be viewed here .) There is no locking in WiredTiger on the common path, so thread-level concurrency usually translates into high CPU utilization and concurrent I/O. As described in our previous blog post we added a feature to WiredTiger to use mmap for I/O instead of system calls. The following chart shows the performance of the wtperf suite on Intel Optane SCM (one and two modules) and on the Optane SSD. We see no significant performance difference between SCM and SSD. Apart from one write-intensive benchmark evict-btree-1, which is faster on SSD, there are no statistically significant differences between the two. Using a dual-module SCM over a single-module SCM gives no performance advantage either. While SCM has a higher bandwidth than a technology-equivalent SSD, the advantage is within an order of magnitude, and, turns out, that effective DRAM caching hides that difference. Latency, and not bandwidth, is where SCM can shine. In contrast to bandwidth, the latency of reading a block of data from an Optane PM is two orders of magnitude shorter than reading it from an Optane SSD: 1 microsecond vs 100-200 microseconds. The most obvious place in a storage engine where latency could be the bottleneck is logging, and academic literature is ripe with successful examples of using Optane PM for logging. In the meantime, stay tuned for our next report on exploring persistent memory. WiredTiger benchmarks on SSD and SCM. Group 1. WiredTiger benchmarks on SSD and SCM. Group 2. WiredTiger benchmarks on SSD and SCM. Group 3. If you found this interesting, be sure to tweet it . Also, don't forget to follow us for regular updates.

August 27, 2020
Engineering Blog

Getting Storage Engines Ready for Fast Storage Devices

Over the past two decades, performance of storage hardware increased by two orders of magnitude. First, with the introduction of solid state drives (SSD), then with the transition from SATA to PCIe, and finally with the innovation in non-volatile memory technology and the manufacturing process [ 1 , 7 ]. More recently, in April 2019, Intel released the first commercial Storage Class Memory (SCM). Its Optane DC Persistent Memory, built with 3D XPoint technology, sits on a memory bus and further reduces I/O latency [ 2 ]. While device access used to dominate I/O latency, the cost of navigating the software stack of a storage system is becoming more prominent as devices’ access time shrinks. This is resulting in a flurry of academic research and in changes to commercially used operating systems (OS) and file systems. Despite these efforts, mainstream system software is failing to keep up with rapidly evolving hardware. Studies [ 4 , 5 , 6 ] have shown that file system and other OS overhead still dominates the cost of I/O in very fast storage devices, such as SCMs. In response to these challenges, academics proposed a new user-level file system, SplitFS [ 6 ], that substantially reduces these overheads. Unfortunately, adopting a user-level file system is not a viable option for many commercial products. Apart from concerns about correctness, stability, and maintenance, adoption of SplitFS would restrict portability, as it only runs on Linux and only on top of the ext4-DAX file system. Fortunately, there IS something that can be done in software storage engines that care about I/O performance. Within MongoDB’s storage engine, WiredTiger, we were able to essentially remove the brakes that the file system applied to our performance without sacrificing the convenience it provides or losing portability. Our changes rely on using memory-mapped files for I/O and batching expensive file system operations. These changes resulted in up to 63% performance improvements for 19 out of 65 benchmarks on mainstream SSDs. Streamlining I/O in WiredTiger Our changes to WiredTiger were inspired by a study from UCSD [ 4 ], where the authors demonstrated that by using memory-mapped files for I/O and by pre-allocating some extra space in the file whenever it needed to grow, they could achieve almost the same performance as if the file system was completely absent. Memory-mapped files Memory-mapped files work as follows. The application makes an mmap system call, whereby it requests the operating system to “map” a chunk of its virtual address space to a same-sized chunk in the file of its choice (Step 1 in Fig.1). When it accesses memory in that portion of the virtual address space for the first time (e.g., virtual page 0xABCD in Fig. 1), the following events take place: Since this is a virtual address that has not been accessed before, the hardware will generate a trap and transfer control to the operating system. The operating system will determine that this is a valid virtual address, ask the file system to read the corresponding page-sized part of the file into its buffer cache, and Create a page table entry mapping the user virtual page to the physical page in the buffer cache (e.g., physical page 0xFEDC in Fig.1), where that part of the file resides (Step 2 in Fig 1). Finally, the virtual-to-physical translation will be inserted into the Translation Lookaside Buffer (TLB -- a hardware cache for these translations), and the application will proceed with the data access. Memory mapped files work as follows: (1) They establish a virtual memory area for the mapped file, (2) They place the virtual-to-physical address translation into the page table, (3) They cache the translation in the Translation Lookaside Buffer (TLB) Subsequent accesses to the same virtual page may or may not require operating system involvement, depending on the following: If the physical page containing the file data is still in the buffer cache and the page table entry is in the TLB, operating system involvement is NOT necessary, and the data will be accessed using regular load or store instructions. If the page containing the file data is still in the buffer cache, but the TLB entry was evicted, the hardware will transition into kernel mode, walk the page table to find the entry (assuming x86 architecture), install it into the TLB and then let the software access the data using regular load or store instructions. If the page containing the file data is not in the buffer cache, the hardware will trap into the OS, which will ask the file system to fetch the page, set up the page table entry, and proceed as in scenario 2. In contrast, system calls cross the user/kernel boundary every time we need to access a file. Even though memory-mapped I/O also crosses the user/kernel boundary in the second and third scenarios described above, the path it takes through the system stack is more efficient than that taken by system calls. Dispatching and returning from a system call adds CPU overhead that memory-mapped I/O does not have [ 8 ]. Furthermore, if the data is copied from the memory mapped file area to another application buffer, it would typically use a highly optimized AVX-based implementation of memcpy. When the data is copied from the kernel space into the user space via a system call, the kernel has to use a less efficient implementation, because the kernel does not use AVX registers [ 8 ]. Pre-allocating file space Memory-mapped files allow us to substantially reduce the involvement of the OS and the file system when accessing a fixed-sized file. If the file grows, however, we do need to involve the file system. The file system will update the file metadata to indicate its new size and ensure that these updates survive crashes. Ensuring crash consistency is especially expensive, because each journal record must be persisted to storage to make sure it is not lost in the event of a crash. If we grow a file piecemeal, we incur that overhead quite often. That is why the authors of SplitFS [ 6 ] and the authors of the UCSD study [ 4 ] both pre-allocate a large chunk of the file when an application extends it. In essence, this strategy batches file system operations to reduce their overhead. Our Implementation The team applied these ideas to WiredTiger in two phases. First, we implemented the design where the size of the mapped file area never changes. Then, after making sure that this simple design works and yields performance improvements, we added the feature of remapping files as they grow or shrink. That feature required efficient inter-thread synchronization and was the trickiest part of the whole design -- we highlight it later in this section. Our changes have been in testing in the develop branch of WiredTiger as of January 2020. As of the time of the writing, these changes are only for POSIX systems; a Windows port is planned for the future. Assuming a fixed-size mapped file area Implementing this part required few code changes. WiredTiger provides wrappers for all file-related operations, so we only needed to modify those wrappers. Upon opening the file, we issue the mmap system call to also map it into the virtual address space. Subsequent calls to wrappers that read or write the file will copy the desired part of the file from the mapped area into the supplied buffer. WiredTiger allows three ways to grow or shrink the size of the file. The file can grow explicitly via a fallocate system call (or its equivalent), it can grow implicitly if the engine writes to the file beyond its boundary, or the file can shrink via the truncate system call. In our preliminary design we disallowed explicitly growing or shrinking the file, which did not affect the correctness of the engine. If the engine writes to the file beyond the mapped area, our wrapper functions simply default to using system calls. If the engine then reads the part of the file that had not been mapped, we also resort to using a system call. While this implementation was decent as an early prototype, it was too limiting for a production system. Resizing the mapped file area The trickiest part of this feature is synchronization. Imagine the following scenario involving two threads, one of which is reading the file and another one truncating it. Prior to reading, the first thread would do the checks on the mapped buffer to ensure that the offset from which it reads is within the mapped buffer’s boundaries. Assuming that it is, it would proceed to copy the data from the mapped buffer. However, if the second thread intervenes just before the copy and truncates the file so that its new size is smaller than the offset from which the first thread reads, the first thread’s attempt to copy the data would result in a crash. This is because the mapped buffer is larger than the file after truncation and attempting to copy data from the part of the buffer that extends beyond the end of the file would generate a segmentation fault. An obvious way to prevent this problem is to acquire a lock every time we need to access the file or change its size. Unfortunately, this would serialize I/O and could severely limit performance. Instead, we use a lock-free synchronization protocol inspired by read-copy-update (RCU) [ 9 ]. We will refer to all threads that might change the size of the file as writers. A writer, therefore, is any thread that writes beyond the end of the file, extends it via a fallocate system call, or truncates it. A reader is any thread that reads the file. Our solution works as follows: A writer first performs the operation that changes the size of the file and then remaps the file into the virtual address space. During this time we want nobody else accessing the mapped buffer, neither readers nor writers. However, it is not necessary to prevent all I/O from occurring at this time; we can simply route I/O to system calls while the writer is manipulating the mapped buffer, since system calls are properly synchronized in the kernel with other file operations. To achieve these goals without locking, we rely on two variables: mmap_resizing: when a writer wants to indicate to others that it is about to exclusively manipulate the mapped buffer, it atomically sets this flag. mmap_use_count: a reader increments this counter prior to using the mapped buffer, and decrements it when it is done. So this counter tells us if anyone is currently using the buffer. The writer waits until this counter goes to zero before proceeding. Before resizing the file and the mapped buffer, writers execute the function prepare_remap_resize_file ; its pseudocode is shown below. Essentially, the writer efficiently waits until no one else is resizing the buffer, then sets the resizing flag to claim exclusive rights to the operation. Then, it waits until all the readers are done using the buffer. prepare_remap_resize_file: wait: /* wait until no one else is resizing the file */ while (mmap_resizing != 0) spin_backoff(...); /* Atomically set the resizing flag, if this fails retry. */ result = cas(mmap_resizing, 1, …); if (result) goto wait; /* Now that we set the resizing flag, wait for all readers to finish using the buffer */ while (mmap_use_count > 0) spin_backoff(...); After executing prepare_remap_resize_file , the writer performs the file-resizing operation, unmaps the buffer, remaps it with the new size and resets the resizing flag. The synchronization performed by the readers is shown in the pseudocode of the function read_mmap : read_mmap: /* Atomically increment the reference counter, * so no one unmaps the buffer while we use it. */ atomic_add(mmap_use_count, 1); /* If the buffer is being resized, use the system call instead of the mapped buffer. */ if (mmap_resizing) atomic_decr(mmap_use_count, 1); read_syscall(...); else memcpy(dst_buffer, mapped_buffer, …); atomic_decr(mmap_use_count, 1); As a side note, threads writing the file must perform both the reader synchronization, as in read_mmap, to see if they can use the memory-mapped buffer for I/O, and the writer synchronization in the case they are writing past the end of the file (hence extending its size). Please refer to the WiredTiger develop branch for the complete source code. Batching file system operations As we mentioned earlier, a crucial finding of the UCSD study that inspired our design [ 4 ], was the need to batch expensive file system operations by pre-allocating file space in large chunks. Our experiments with WiredTiger showed that it already uses this strategy to some extent. We ran experiments comparing two configurations: (1) In the default configuration WiredTiger uses the fallocate system call to grow files. (2) In the restricted configuration WiredTiger is not allowed to use fallocate and thus resorts to implicitly growing files by writing past their end. We measured the number of file system invocations in both cases and found that it was at least an order of magnitude smaller in the default configuration than in the restricted. This tells us that WiredTiger already batches file system operations. Investigating whether batching can be optimized for further performance gains is planned for the future. Performance To measure the impact of our changes, we compared the performance of the mmap branch and the develop branch on the WiredTiger benchmark suite WTPERF. WTPERF is a configurable benchmarking tool that can emulate various data layouts, schemas, and access patterns while supporting all kinds of database configurations. Out of 65 workloads, the mmap branch improved performance for 19. Performance of the remaining workloads either remained unchanged or showed insignificant changes (within two standard deviations of the average). Variance in performance of two workloads (those that update a log-structured merge tree) increased by a few percent, but apart from these, we did not observe any downsides to using mmap. The figures below show the performance improvement, in percent, of the mmap branch relative to develop for the 19 benchmarks where mmap made a difference. The experiments were run on a system with an Intel Xeon processor E5-2620 v4 (eight cores), 64GB of RAM and an Intel Pro 6000p series 512GB SSD drive. We used default settings for all the benchmarks and ran each at least three times to ensure the results are statistically significant. All but 2 of the benchmarks where mmap made a difference show significant improvements Overall, there are substantial performance improvements for these workloads, but there are a couple interesting exceptions. For 500m-btree-50r50u and for update-btree some operations (e.g., updates or inserts) are a bit slower with mmap, but others (typically reads) are substantially faster. It appears that some operations benefit from mmap at the expense of others; we are still investigating why this is happening. One of the variables that explains improved performance with mmap is increased rate of I/O. For example, for the 500m-btree-50r50u workload (this workload simulates a typical MongoDB load) the read I/O rate is about 30% higher with mmap than with system calls. This statistic does not explain everything: after all, read throughput for this workload is 63% better with mmap than with system calls. Most likely, the rest of the difference is due to more efficient code paths of memory-mapped I/O (as opposed to going through system calls), as observed in earlier work [8]. Indeed, we typically observe a higher CPU utilization when using mmap. Conclusion Throughput and latency of storage devices improve at a higher rate than CPU speed thanks to radical innovations in storage technology and the placement of devices in the system. Faster storage devices reveal inefficiencies in the software stack. In our work we focussed on overhead related to system calls and file system access and showed how it can be navigated by employing memory-mapped I/O. Our changes in the WiredTiger storage engine yielded up to 63% improvement in read throughput. For more information on our implementation, we encourage you to take a look at the files os_fs.c and os_fallocate.c in the os_posix directory of the WiredTiger develop branch . References [1] List of Intel SSDs. https://en.wikipedia.org/wiki/List_of_Intel_SSDs [2] Optane DC Persistent Memory. https://www.intel.ca/content/www/ca/en/architecture-and-technology/optane-dc-persistent-memory.html [3] Linux® Storage System Analysis for e.MMC with Command Queuing, https://www.micron.com/-/media/client/global/documents/products/white-paper/linux_storage_system_analysis_emmc_command_queuing.pdf?la=en [4] Jian Xu, Juno Kim, Amirsaman Memaripour, and Steven Swanson. 2019. Finding and Fixing Performance Pathologies in Persistent Memory Software Stacks. In 2019 Architectural Support for Program- ming Languages and Operating Systems (ASPLOS ’19). http://cseweb.ucsd.edu/~juk146/papers/ASPLOS2019-APP.pdf [5] Jian Xu and Steven Swanson, NOVA: A Log-structured File System for Hybrid Volatile/Non-volatile Main Memories, 14th USENIX Conference on File and Storage Technologies (FAST’16). https://www.usenix.org/system/files/conference/fast16/fast16-papers-xu.pdf [6] Rohan Kadekodi, Se Kwon Lee, Sanidhya Kashyap, Taesoo Kim, Aasheesh Kolli, and Vijay Chidambaram. 2019. SplitFS: reducing software overhead in file systems for persistent memory. In Proceedings of the 27th ACM Symposium on Operating Systems Principles (SOSP ’19). https://www.cs.utexas.edu/~vijay/papers/sosp19-splitfs.pdf [7] SDD vs HDD. https://www.enterprisestorageforum.com/storage-hardware/ssd-vs-hdd.html [8] Why mmap is faster than system calls. https://medium.com/@sasha_f/why-mmap-is-faster-than-system-calls-24718e75ab37 [9] Paul McKinney. What is RCU, fundamentally? https://lwn.net/Articles/262464/ If you found this interesting, be sure to tweet it . Also, don't forget to follow us for regular updates.

March 16, 2020
Engineering Blog

Transpiling Between Any Programming Languages

Input is converted via an ANTLR parse tree and code generation to output MongoDB Compass, the UI for MongoDB, recently introduced a pair of features to make it easier for developers to work in their chosen language. Users can now export the queries and aggregations they build in the UI to their preferred language. Soon they will also be able to input them in their preferred language. Allowing developers the flexibility to choose between multiple input languages and multiple output languages when using Compass required us to build a custom solution in the form of a many-to-many transpiler. Most compilers are one-to-one, or less commonly, one-to-many or many-to-one. There are hardly any many-to-many transpilers. To avoid having to start from scratch, we leveraged the open source parsing tool ANTLR which provided us with a set of compiler tools along with preexisting grammars for the languages we needed. We successfully minimized the amount of additional complexity by coming up with a creative set of class hierarchies that reduced the amount of work needed from n² to n. Motivation MongoDB Compass is an application that provides a UI for the database and helps developers to iteratively develop aggregations and queries. When building queries, the application currently requires input to be made in a JavaScript-based query language called MongoDB Shell. Compass aggregation pipeline builder To enable developers to use their preferred programming language when developing aggregation pipelines and queries, we wanted to add functionality in two parts. First, we wanted to allow developers that are familiar with the MongoDB Shell to export queries they create in the language they need (Python, Java, etc.). Second, we wanted to allow developers to use their language of choice while building a query. To achieve both and allow users maximum flexibility, our system therefore needed to accept multiple input languages as well as generate multiple output languages in an efficient way. Compass Export to Language allows you to export a pipeline in the language of your choice At the basis of these features is sophisticated compiler technology in the form of a transpiler. A transpiler is a source-to-source compiler which takes the source code of a program written in one programming language as its input and produces the equivalent source code in another programming language. Since our transpiler currently supports extended JSON, also called BSON, we call it a BSON transpiler. While we currently support only a subset of each programming language, the transpiler is designed in a way that would allow us to extend support to include the entire language syntax. Design Approach The Compass application is designed with an extensible plugin architecture allowing us to build the transpiler as a standalone plugin. To work with the Electron framework our application is based on, our plugin needed to be executable in JavaScript. There are lots of different transpilers in JavaScript which we considered. However, for our use case, we needed any language to any language transformation with support for BSON, which meant we needed a custom solution. Compass queries always take the form of either an array of BSON documents (stages for the aggregation pipeline) or a BSON document (for other queries) containing the MongoDB query language. While this constraint reduces the scope of the problem for the BSON transpiler, the language subset we need to support is large and complex enough that we decided to treat the problem as if we were adding full-language support. The naive approach to building a source-to-source compiler supporting multiple languages would result in a polynomial amount of effort since the number of language combinations is the product of the number of input and output languages. We needed to build a sustainable design so that adding new input and output languages only requires building O(1) components per language. This reduces the entire problem to O(n) for the number of languages n. We achieved this by abstracting the problem into independent input and output stages that are loosely coupled by their interface to a shared, intermediate, in-memory data structure: a parse tree. The input language stage just needs to build the tree, and the output language stage just needs to read it. Most compilers have two primary stages: parsing and code generation. The parsing stage is responsible for turning the literal text of a program into a tree representing an abstraction of its meaning, and the code generation stage walks that tree and produces an output that can be executed -- generally a binary of machine or virtual machine instructions. A key observation is that a source-to-source compiler can be seen as a specialized compiler in which the code generation stage generates program text, in another user-friendly language, instead of machine code. The design of our transpiler stems from that concept. Parsing In order to process some source code, such as the string new NumberDecimal(5) , a lexical analyzer or lexer takes raw code and splits it into tokens (this process is known as lexical analysis). A token is an object that represents a block of text corresponding to one of the primitive components of the language syntax. It could be a number, label, punctuation, an operator, etc. In the parsing stage these tokens are then transformed into a tree structure that describes not only isolated pieces of the input code but also their relationship to each other. At this point the compiler is able to recognise language constructs such as variable declarations, statements, expressions, and so on. The leaves of this tree are the tokens found by the lexical analysis. When the leaves are read from left to right, the sequence is the same as in the input text. Stages of compiler processing: Input is transformed via lexical analysis into tokens, tokens are transformed via syntax analysis into an AST which is used to generate the output code We did not want to write our own parser and lexer since it is incredibly time consuming even for a single language, and we have to support multiple. Luckily, there are many "parser-generator" tools that efficiently generate syntax trees from a set of rules, called a grammar. These tools take an input grammar, which is hierarchical and highly structured, parse an input string based on that grammar, and convert it to a tree structure. The tricky part of using parser-generators is the tedious and error-prone process of writing the grammars. Writing a grammar from scratch requires a detailed knowledge of the input language with all of its edge cases. If the transpiler needs to support many programming languages, we would have to write grammars for each of the input languages which would be a huge task. Source-to-source transformation with ANTLR This is why we decided to use ANTLR , a powerful parser-generator that, most importantly, already has grammars for almost all programming languages of interest. ANTLR also has a JavaScript runtime, allowing us to use it in our Node.js project. We considered using the LLVM-IR, a different set of compiler technologies that compile to an intermediate high-level representation . This approach would then need a separate step to compile the intermediate representation into the target language. This is a common pattern for multi-platform compilers, like the Clang/ LLVM project . Unfortunately, there are not many existing compilers that go from the intermediate representation back to user programming languages. We would have had to write those compilers ourselves, so ultimately using LLVM would not have saved us much effort. The code snippet below illustrates the basic structure of a program that builds a parse tree for ECMAScript (Javascript) input source code. This code imports auxiliary lexer and parser files and lets ANTLR pull characters from the input string, create a character stream, convert it to a token stream and finally build a parse tree. // It takes only a few lines to go from a string input to a fully parsed traversable tree! const antlr4 = require('antlr4'); const ECMAScriptLexer = require('./lib/antlr/ECMAScriptLexer.js'); const ECMAScriptParser = require('./lib/antlr/ECMAScriptParser.js'); const input = 'new NumberDecimal(5)'; const chars = new antlr4.InputStream(input); const lexer = new ECMAScriptLexer.ECMAScriptLexer(chars); const tokens = new antlr4.CommonTokenStream(lexer); const parser = new ECMAScriptParser.ECMAScriptParser(tokens); const tree = parser.program(); The resulting parse tree inherits from the ANTLR-defined ParseTree class, giving it a uniform way to be traversed. Note that the parsing stage and resulting parse tree are determined by the input language; they are completely independent of the stage where we generate the output language into which we seek to translate the source code. This independence in our design allows us to reduce the number of parts we need to write to cover our input and output languages from O(n²) to O(n). Code Generation Tree types Using ANTLR for its library of pre-built grammars requires a slight compromise in our design. To understand why, it is necessary to understand the difference between two terms that are related and sometimes used interchangeably: a parse tree and an abstract syntax tree (AST). Conceptually these trees are similar because they both represent the syntax of a snippet of source code; the difference is the level of abstraction. An AST has been fully abstracted to the point that no information about the input tokens themselves remains. Because of this, ASTs representing the same instructions are indistinguishable, regardless of what language produced them. By contrast, a parse tree contains the information about the low-level input tokens, so different languages will produce different parse trees, even if they do the same thing. Abstract syntax tree and parse tree comparison given an input of "new NumberDecimal(5)" Ideally, our code generating stage would operate on an AST, not a parse tree, because having to account for language-specific parse trees introduces complexity we’d rather avoid. ANTLR4, however, only produces read-only parse trees. But the advantages of using ANTLR and its ready-made grammars are well worth that trade-off. Visitors Parse tree traversal Like most compilers, the BSON transpiler uses a visitor pattern to traverse parse trees. ANTLR not only builds a parse tree but it also programmatically generates a skeleton visitor class. This visitor class contains methods for traversing parse trees (one visit method for each type of node of the tree). All of these methods begin with visit and end with the name of the node that it will visit - e.g. visitFuncCall() or visitAdditiveExpression() . The node names are taken directly from the input language grammar file, so each visitor class and its methods are tailored to the input language grammar. In the ANTLR-generated visitor class, these methods do not do anything except recurse on the child nodes. In order for our visitor to be able to transpile code, we need to subclass the generated visitor class and override each visit method to define what to do with each type of node. Since the BSON transpiler is built to support multiple input languages, and each language will produce a different kind of parse tree, we need to create one custom visitor for each input language supported by Compass. However, as long as we avoid building a custom visitor for each combination of input and output language, we are still only building O(n) components. With this design, each visitor is responsible for traversing a single language parse tree. The visitor calls functions as it visits each node and returns the original text of the node or can transform this text the way we need. Starting from the root, the visitor calls the visit method recursively, descending to the leaves in a depth-first order. On the way down, the visitor decorates nodes with metadata, such as type information. On the way up, it returns the transpiled code. Generators With a brute force solution, the visit* methods of the visitor would contain code for generating the output language text. To generate multiple output languages, we would have to specialize each method depending on the current output language. Overall, this approach would subclass every language-specific visitor class once for every output language, or worse yet, put a giant switch statement in each of the visit* methods with a case for each output language. Both of those options are brittle and require O(n²) development effort. Therefore we chose to decouple the code for traversing the language-specific trees from the code for generating output. We accomplished this by encapsulating code-generation for each language into a set of classes called Generators, which implement a family of emit* methods, like emitDate and emitNumber used to produce the output code. Class composition Class dependency diagram Our design was informed by the need for the visitor to be able to call generator methods without needing to know which generator they were using. Since code generation actually has a lot in common regardless of the output language, we wanted to implement a system where we could abstract the default behavior as much as possible and leave the generator to only handle edge cases. We chose to make use of JavaScript’s dynamic mechanics for inheritance and method dispatch by having the generator class inherit from the visitor class. Because JavaScript does not require that methods be defined before they are called, the visitor can call emit methods on itself that are actually defined in the generator and the generator can call visitor methods to continue the tree traversal. Using a generator class determined by the output language and a visitor class determined from the input language, we are able to compose a transpiler on-the-fly as it is exported. Generators are similar to an abstract interface, except there are no classic interfaces in JavaScript. As illustrated in the code snippet below, for each language combination our application creates a specialized transpiler instance composed of the corresponding visitor and generator classes. When our application receives a piece of code from the user, it creates a parse tree. The transpiler then visits the parse tree, using the ParseTreeVisitor’s visit method inherited from our custom Visitor subclass, and the language-specific, ANTLR generated visitor class (such as ECMAScriptVisitor). // Each composite transpiler instance has the ability to traverse the parse tree // for a specific language with its 'visit*' methods, and generate output code for // another language with its 'emit*' methods. const getJavascriptVisitor = require('./codegeneration/javascript/Visitor'); const getJavaGenerator = require('./codegeneration/java/Generator'); const getPythonGenerator = require('./codegeneration/python/Generator'); ... const loadJSTree = (input) => { /* Lexing and parsing the user input */ ... }; /** * Compose a transpiler and return a compile method that will use that transpiler * to visit the tree and return generated code. * * @param {function} loadTree - the method takes in user input and returns a tree. * @param {Visitor} visitor - the input-language visitor. * @param {function} generator - returns a generator that inherits from it’s arg. * * @returns {function} the compile function to be exported */ const composeTranspiler = (loadTree, visitor, generator) => { const Transpiler = generator(visitor); const transpiler = new Transpiler(); return { compile: (input) => { const tree = loadTree(input); return transpiler.start(tree); } }; } module.exports = { javascript: { java: composeTranspiler( loadJSTree, getJavascriptVisitor(JavascriptANTLRVisitor), // Visitor + ANTLR visitor getJavaGenerator // Method that takes in a superclass, i.e. the visitor ), python: composeTranspiler( loadJSTree, getJavascriptVisitor(JavascriptANTLRVisitor)), getPythonGenerator ), ... }, ... } Tree Traversal Example Simple Nodes In the most straightforward case, think the JavaScript snippet of text "hello world" , the first thing the custom visitor class needs to do is specify the entry point for the tree traversal. Since the entry nodes in different languages have different names (i.e. file_input in Python, but program in JavaScript), we define a method in each visitor called start that calls the visit method for the root node for that input language. That way our compiler can simply call start on each visitor without having to worry what the root node is called. // Entry point for the tree traversal class Visitor extends ECMAScriptVisitor { start(ctx) { return this.visitProgram(ctx); } } The default behavior of the ANTLR visit methods is to recur on each child node and return the results in an array. If the node doesn’t have any children, then the visit method will return the node itself. So if we do not overwrite any of the ANTLR methods, then the return value of our start method would be an array of nodes. To go from returning nodes to returning strings in our simple "hello world" example, we first overwrite the visitTerminal method so that the leaf nodes will return the raw text stored in the node. We then modify the visitChildren method so that instead of putting the results of visiting each child node into an array, the results get concatenated into a string. Those two changes are enough for our "hello world” example to be fully translated into a language that uses the same string representation, like Python. // Overwriting of 'visitTerminal()' method class Visitor extends ECMAScriptVisitor { start(ctx) { return this.visitProgram(ctx); } // Visits a leaf node and returns a string visitTerminal(ctx) { return ctx.getText(); } // Concatenate the results of recurring on child nodes visitChildren(ctx) { return ctx.children.reduce( (code, child) => `${code} ${this.visit(child)}`, '' ); } } Transformations However, we cannot always just concatenate the text values of the terminal nodes to form the result. Instead we need to transform floating point numbers, as well as numbers in different numeral systems without losing any precision. For string literals we need to think about single and double quotes, escape sequences, comments, spaces and empty lines. This type of transformation logic can be applied to any type of node. Let’s look at a concrete example: In Python, an object property name must be enclosed in quotes ( {'hello': 'world'} ); in JavaScript this is optional ( {hello: 'world'} ). In this particular case, this is the only one modification we need in order to transform a fragment of JavaScript code into Python code. // Transformation of JavaScript code into Python code class Visitor extends ECMAScriptVisitor { ... visitPropertyExpressionAssignment(ctx) { const key = this.visit(ctx.propertyName()); const value = this.visit(ctx.singleExpression()); if ('emitPropertyExpressionAssignment' in this) { return this['emitPropertyExpressionAssignment']; } return `${key}: ${value}`; } } The propertyExpressionAssignment node has two child nodes ( propertyName and singleExpression ). To get the values of these two child nodes, we need to traverse them separately as left hand side and right hand side subtrees. Traversing subtrees returns the original or transformed values of the child nodes. We can then build a new string using retrieved values here to make up the transformed code fragment. Instead of doing this in the visitor directly, we check if the corresponding emit method exists. If the visitor finds a proper emit method, it will delegate the transformation process to the generator class. By doing this we free our visitors from knowing anything about the output language. We just assume that there is some generator class that knows how to handle the output language. However, If this method doesn’t exist, the visitor will return the original string without any transformation. In our case we assume a emitPropertyExpressionAssignment was supplied and this will return the transformed JavaScript string. Processing In more complex cases, we must do some preprocessing in the visitor before we can call any emit methods. For example, date representations are a complex case because dates have a wide range of acceptable argument formats across different programming languages. We need to do some preprocessing in the visitor so we can ensure that all the emit methods are sent the same information, regardless of input language. In this case of a date node, the easiest way to represent date information is to construct a JavaScript Date object and pass it to the generator. Node types that need pre-processing must have a process* method defined in the visitor to handle this pre-processing. For this example it would be called processDate . // 'processDate()' creates a date object to pass it to the emit method processDate(node) { let text = node.getText(); // Original input text for this node let date; try { date = this.executeJavascript(text); // Construct a date object in a sandbox } catch (error) { throw new BsonTranspilersError(error.message); } if ('emitDate' in this) { return this.emitDate(node, date); } ... } For this processDate method, since we are compiling JavaScript and the transpiler is written in JavaScript, we took a shortcut: executing the users input to construct the Date. Because it has already been tokenized we know exactly what the code contains so it is safe to execute in a sandbox. For processing dates in other language we would instead parse the results and construct the date object through arguments. Upon completion, the process method will then call the respective emit* method, emitDate and pass it the constructed Date as an argument. Now that we can call the required process and emit methods from the visitor’s appropriate visit method. // This is a generator that generates code for Python. // The 'emitDate()' method is defined in the Generator and called from the Visitor module.exports = (superClass) => class Generator extends superClass { emitDate(node, date) { const dateStr = [ date.getUTCFullYear(), date.getUTCMonth() + 1, date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds() ].join(', '); return `datetime.datetime(${dateStr}, tzinfo=datetime.timezone.utc)`; } }; Given the input string Date(‘2019-02-12T11:31:14.828Z’) , the root of the parse tree will be a FuncCallExpression node. The visit method for this node will called visitFuncCallExpression() . // Example of a Visitor class that extends the ECMAScript grammar class Visitor extends ECMAScriptVisitor { /** * Visits a node that represents a function call. * * @param {FuncCallExpression} node - The tree node * @return {String} - The generated code */ visitFuncCallExpression(node) { const lhs = this.visit(node.functionName()); const rhs = this.visit(node.arguments()); if (`process${lhs}` in this) { return this[`process${lhs}`](node); } if (`emit${lhs}` in this) { return this[`emit${lhs}`](node); } return `${lhs}${rhs}`; } ... } The first thing the visit method does is recurse on its two child nodes. The left-hand-child represents the function name node, i.e. Date . The right-hand-child represents the arguments node, i.e. 2019-02-12T11:31:14.828Z . Once the method retrieves the name of the function it can check to see if that function requires any preprocessing. It checks if the processDate method is defined and, failing that check, whether an emitDate method is defined. Even though the emitDate method is defined in the generator, since the visitor and generator are composed into one class, the visitor treats emit methods as if they were its own class methods. If neither method exists, the visit* method will return a concatenation of the results of the recursion on the child nodes. Every input language has its own visitor that can contain processing logic and every output language has its own generator that contains the required transformation logic for the specific language. As a rule, transformations required by all output languages will happen as processing logic, while all other transformations happen in the generator. With this design, different transpilers based on different visitors can use the same generator methods. That way, for every input language we add, we only need to define a single visitor. Similarly, for every output language we add, we only need to define a single generator. For n languages we want to support, we now have O(n) amount of work instead of having to write one visitor-generator for every language combination. Conclusion The Compass BSON transpiler plugin has the potential to parse and generate MongoDB queries and aggregations in any programming language. The current version supports several input (MongoDB Shell, Javascript, and Python) and output (Java, C#, Python, MongoDB Shell, and Javascript) languages. The BSON transpiler plugin is built as a standalone Node.js module and can be used in any browser-based or Node.js application with npm install bson-transpilers . As many other MongoDB projects, the BSON transpiler plugin is open-source, you can go to the repo and we welcome contributions. If you want to contribute to the Compass BSON transpiler, please check our contributing section on GitHub . When writing the BSON transpiler, we were guided by general compiler design principles (lexical analysis, syntax analysis, tree traversal). We used ANTLR to reduce the amount of manual work required to parse the input languages of interest, which allowed us to focus mostly on modularizing the code generation process. A major benefit of modularizing the language definitions is that a user can contribute a new output language without needing to know anything about the input languages that are currently supported. The same rule applies for adding a new input language: you should be able to define your visitor without needing to care about the existing generators. The latest version of the BSON transpiler plugin is more complex and powerful than what has been covered by the current blog post. It supports a wider range of syntax through the use of a symbol table. It also includes the entire BSON library, function calls with arguments and type validation, and informative error messages. On top of that, we have added a high level of optimization by using string templates to abstract a lot of the code generation. All of these developments will be described in a future blog post. Written by Anna Herlihy , Alena Khineika , & Irina Shestak . Illustrations by Irina Shestak Further Reading Compiler in JavaScript using ANTLR by Alena Khineika; Compiler Construction by Niklaus Wirth; Compilers: Principles, Techniques, and Tools by Alfred V. Aho, Monica S. Lam, Ravi Sethi and Jeffrey D. Ullman; The Elements of Computing Systems by Noam Nisan and Shimon Schocken. If you found this interesting, be sure to tweet it . Also, don't forget to follow us for regular updates.

July 1, 2019
Engineering Blog

Repeatable Performance Tests: CPU Options Are Best Disabled

In an effort to improve repeatability, the MongoDB Performance team set out to reduce noise on several performance test suites run on EC2 instances. At the beginning of the project, it was unclear whether our goal of running repeatable performance tests in a public cloud was achievable. Instead of debating the issue based on assumptions and beliefs, we decided to measure noise itself and see if we could make configuration changes to minimize it. After thinking about our assumptions and the experiment setup , we began by recording data about our current setup and found no evidence of particularly good or bad EC2 instances . In the next step, we investigated IO and found that EBS instances are the stable option for us . Having found a very stable behavior as far as disks were concerned, this third and final experiment turns to tuning CPU related knobs to minimize noise from this part of the system. Investigate CPU Options We already built up knowledge around fine-tuning CPU options when setting up another class of performance benchmarks (single node benchmarks). That work had shown us that CPU options could also have a large impact on performance. Additionally, it left us familiar with a number of knobs and options we could adjust. Knob Where to set Setting What it does &nbsp;Idle Strategy &nbsp;Kernel Boot &nbsp;idle=poll &nbsp;Puts linux into a loop when idle, &nbsp;checking for work. &nbsp;Max sleep state &nbsp;&nbsp;&nbsp;(c4 only) &nbsp;Kernel Boot &nbsp;intel_idle.max_cstate=1 &nbsp;intel_pstate=disable &nbsp;Disables the use of advanced &nbsp;processor sleep states. &nbsp;CPU Frequency &nbsp;Command Line &nbsp;sudo cpupower frequency-set -d &nbsp;2.901GHz &nbsp;Sets a fixed frequency. Doesn't &nbsp;allow the CPU to vary the &nbsp;frequency for power saving. &nbsp;Hyperthreading &nbsp;Command Line &nbsp;echo 0 > /sys/devices/system/ &nbsp;cpu/cpu$i/online &nbsp;Disables hyperthreading. &nbsp;Hyperthreading allows two &nbsp;software threads of execution to &nbsp;share one physical CPU. They &nbsp;compete against each other for &nbsp;resources. We added some CPU specific tests to measure CPU variability. These tests allow us to see if the CPU performance is noisy, independently of whether that noise makes MongoDB performance noisy. For our previous work on CPU options, we wrote some simple tests in our C++ harness that would, for example: multiply numbers in a loop (cpu bound) sleep 1 or 10 ms in a loop Do nothing (no-op) in the basic test loop We added these tests to our System Performance project. We were able to run the tests on the client only, and going across the network. We ran our tests 5x5 times, changing one configuration at a time, and compared the results. The first two graphs below contain results for the CPU-focused benchmarks, the third contains the MongoDB-focused benchmarks. In all the below graphs, we are graphing the "noise" metric as a percentage computed from (max-min)/median and lower is better. We start with our focused CPU tests, first on the client only, and then connecting to the server. We’ve omitted the sleep tests from the client graphs for readability, as they were essentially 0. Results for CPU-focused benchmarks with different CPU options enabled The nop test is the noisiest test all around, which is reasonable because it’s doing nothing in the inner loop. The cpu-bound loop is more interesting. It is low on noise for many cases, but has occasional spikes for each case, except for the case of the c3.8xlarge with all the controls on (pinned to one socket, hyperthreading off, no frequency scaling, idle=poll). Results for tests run on server with different CPU options enabled When we connect to an actual server, the tests become more realistic, but also introduce the network as a possible source of noise. In the cases in which we multiply numbers in a loop (cpuloop) or sleep in a loop (sleep), the final c3.8xlarge with all controls enabled is consistently among the lowest noise and doesn’t do badly on the ping case (no-op on the server). Do those results hold when we run our actual tests? Results for tests run on server with different CPU options enabled Yes, they do. The right-most blue bar is consistently around 5%, which is a great result! Perhaps unsurprisingly, this is the configuration where we used all of the tuning options: idle=poll, disabled hyperthreading and using only a single socket. We continued to compare c4 and c3 instances against each other for these tests. We expected that with the c4 being a newer architecture and having more tuning options, it would achieve better results. But this was not the case, rather the c3.8xlarge continued to have the smallest range of noise. Another assumption that was wrong! We expected that write heavy tests, such as batched inserts, would mostly benefit from the more stable IOPS on our new EBS disks, and the CPU tuning would mostly affect cpu-bound benchmarks such as map-reduce or index build. Turns out this was wrong too - for our write heavy tests, noise did not in fact predominantly come from disk. The tuning available for CPUs has a huge effect on threads that are waiting or sleeping. The performance of threads that are actually running full speed is less affected - in those cases the CPU runs at full speed as well. Therefore, IO-heavy tests are affected a lot by CPU-tuning! Disabling CPU options in production Deploying these configurations into production made insert tests even more stable from day to day: Improvements in daily performance measurements through changing to EBS and disabling CPU options Note that the absolute performance of some tests actually dropped, because the number of available physical CPUs dropped by ½ due to only using a single socket, and disabling hyperthreading causes a further drop, though not quite a full half, of course. Conclusion Drawing upon prior knowledge, we decided to fine tune CPU options. We had previously assumed that IO-heavy tests would have a lot of noise coming from disk and that CPU tuning would mostly affect CPU-bound tests. As it turns out, the tuning available for CPUs actually has a huge effect on threads that are waiting or sleeping and therefore has a huge effect on IO-heavy tests. Through CPU tuning, we achieved very repeatable results. The overall measured performance in the tests decreases but this is less important to us. We care about stable, repeatable results more than maximum performance . This is the third and last of three bigger experiments we performed in our quest to reduce variability in performance tests on EC2 instances. You can read more about the top level setup and results as well as how we found out that EC2 instances are neither good nor bad and that EBS instances are the stable option . If you found this interesting, be sure to tweet it . Also, don't forget to follow us for regular updates.

April 30, 2019
Engineering Blog

Reducing Variability in Performance Tests on EC2: Setup and Key Results

On the MongoDB Performance team, we use EC2 to run daily system performance tests. After building a continuous integration system for performance testing, we realized that there were sources of random variation in our platform and system configuration which made a lot of our results non-reproducible. The run to run variation from the platform was bigger than the changes in MongoDB performance that we wanted to capture. To reduce such variation - environmental noise - from our test configuration, we set out on a project to measure and control for the EC2 environments on which we run our tests. At the outset of the project there was a lot of doubt and uncertainty. Maybe using a public cloud for performance tests is a bad idea and we should just give up and buy more hardware to run them ourselves? We were open to that possibility, however we wanted to do our due diligence before taking on the cost and complexity of owning and managing our own test cluster. Performance benchmarks in continuous integration MongoDB uses a CI platform called Evergreen to run tests on incoming commits. We also use Evergreen for running multiple classes of daily performance tests. In this project we are focused on our highest level tests, meant to represent actual end-user performance. We call these tests System Performance tests. For _System Performance_tests, we use EC2 to deploy real and relatively beefy clusters of c3.8xlarge nodes for various MongoDB clusters: standalone servers, 3 Node Replica Sets, and Sharded Clusters. These are intended to be representative of how customers run MongoDB. Using EC2 allows us to flexibly and efficiently deploy such large clusters as needed. Each MongoDB node in the cluster is run on its own EC2 node, and the workload is driven from another EC2 node. Repeatability There's an aspect of performance testing that is not obvious and often ignored. Most benchmarking blogs and reports are focusing on the maximum performance of a system, or whether it is faster than some competitor system. For our CI testing purposes, we primarily care about repeatability of the benchmarks. This means, the same set of tests for the same version of MongoDB on the same hardware should produce the same results whether run today or in a few months. We want to be able to detect small changes in performance due to our ongoing development of MongoDB. A customer might not get very upset about a 5% change in performance, but they will get upset about multiple 5% regressions adding up to a 20% regression. The easiest way to avoid the large regressions is to identify and address the small regressions promptly as they happen, and stop the regressions getting to releases or release candidates. We do want to stress MongoDB with a heavy load, but, achieving some kind of maximum performance is completely secondary to this test suite’s goal of detecting changes. For some of our tests, repeatability wasn't looking so good. In the below graph, each dot represents a daily build (spoiler -- you’ll see this graph again): Variability in daily performance tests Eyeballing the range from highest to lowest result, the difference is over 100,000 documents / second from day to day. Or, as a percentage, a 20-30% range. Investigation To reduce such variation from our test configuration, we set out on a project to reduce any environmental noise. Instead of focusing on the difference between daily MongoDB builds, we ran tests to focus on EC2 itself. Process: Test and Analyze Benchmarking is really an exercise of the basic scientific process: Try to understand a real world phenomenon, such as an application that uses MongoDB Create a model (aka benchmark) of that phenomenon (this may include setting a goal, like "more updates/sec") Measure Analyze and learn from the results Repeat: do you get the same result when running the benchmark / measuring again? Change one variable (based on analysis) and repeat from above We applied this benchmarking process to evaluate the noise in our system. Our tests produce metrics measuring the average operations per second (ops/sec). Occasionally, we also record other values but generally we use ops/sec as our result. To limit other variables, we locked the mongod binary to a stable release (3.4.1) and repeated each test 5 times on 5 different EC2 clusters, thus producing 25 data points. We used this system to run repeated experiments. We started with the existing system and considered our assumptions to create a list of potential tests that could help us determine what to do to decrease the variability in the system. As long as we weren’t happy with the results we returned to this list and picked the most promising feature to test. We created focused tests to isolate the specific feature, run the tests and analyze our findings. Any workable solutions we found were then put into production. For each test, we analyzed the 25 data points, with a goal of finding a configuration that minimizes this single metric: range = (max - min) / median Being able to state your goal as a single variable such as above is very powerful. Our project now becomes a straightforward optimization process of trying different configurations, in order to arrive at the minimum value for this variable. It's also useful that the metric is a percentage, rather than an absolute value. In practice, we wanted to be able to run all our tests so that the range would always stay below 10%. Note that the metric we chose to focus on is more ambitious than, for example, focusing on reducing variance. Variance would help minimize the spread of most test results, while being fairly forgiving about one or two outliers. For our use case, an outlier represents a false regression alert, so we wanted to find a solution without any outliers at all, if possible. Any experiment of this form has a tension between the accuracy of the statistics, and the expense (time and money) of running the trials. We would have loved to collect many more trials per cluster, and more distinct clusters per experiment giving us higher confidence in our results and enabling more advanced statistics. However, we also work for a company that needed the business impact of this project (lower noise) as soon as possible. We felt that the 5 trials per cluster times 5 clusters per experiment gave us sufficient data fidelity with a reasonable cost. Assume nothing. Measure everything. The experimental framework described above can be summarized in the credo of: Assume nothing. Measure everything. In the spirit of intellectual honesty, we admit that we have not always followed the credo of Assume nothing. Measure everything, usually to our detriment. We definitely did not follow it when we initially built the System Performance test suite. We needed the test suite up as soon as possible (preferably yesterday). Instead of testing everything, we made a best effort to stitch together a useful system based on intuition and previous experience, and put it into production. It’s not unreasonable to throw things together quickly in time of need (or as a prototype). However, when you (or we) do so, you should check if the end results are meeting your needs, and take the results with a large grain of salt until thoroughly verified. Our system gave us results. Sometimes those results pointed us at useful things, and other times they sent us off on wild goose chases. Existing Assumptions We made a lot of assumptions when getting the first version of the System Performance test suite up and running. We will look into each of these in more detail later, but here is the list of assumptions that were built into the first version of our System Performance environment: Assumptions: A dedicated instance means more stable performance Placement groups minimize network latency & variance Different availability zones have different hardware For write heavy tests, noise predominantly comes from disk Ephemeral (SSD) disks have least variance Remote EBS disks have unreliable performance There are good and bad EC2 instances In addition, the following suggestions were proposed as solutions to reducing noise in the system: Just use i2 instances (better SSD) and be done with it Migrate everything to Google Cloud Run on prem -- you’ll never get acceptable results in the cloud Results After weeks of diligently executing the scientific process of hypothesize - measure - analyze - repeat we found a configuration where the range of variation when repeating the same test was less than 5%. Most of the configuration changes were normal Linux and hardware configurations that would be needed on on-premise hardware just the same as on EC2. We thus proved one of the biggest hypotheses wrong: You can't use cloud for performance testing With our first experiment, we found that there was no correlation between test runs and the EC2 instances they were run on. Please note that these results could be based on our usage of the instance type; you should measure your own systems to figure out the best configuration for your own system. You can read more about the specific experiment and its analysis in our blog post EC2 instances are neither good nor bad . There are good and bad EC2 instances After running the first baseline tests, we decided to investigate IO performance. Using EC2, we found that by using Provisioned IOPS we get a very stable rate of disk I/O per second. To us, it was surprising that ephemeral (SSD) disks were essentially the worst choice. After switching our production configuration from ephemeral SSD to EBS disks, the variation of our test results decreased dramatically. You can read more about our specific findings and how different instance types performed in our dedicated blog post EBS instances are the stable option . Ephemeral (SSD) disks have least variance Remote EBS disks have unreliable performance -> PIOPS Just use i2 instances (better SSD) and be done with it (True in theory) Next, we turned our attention to CPU tuning. We learned that disabling CPU options does not only stabilize CPU-bound performance results. In fact, noise in IO-heavy tests also seems to go down significantly with CPU tuning. For write heavy tests, noise predominantly comes from disk After we disabled CPU options, the variance in performance decreased again. In the below graph you can see how changing from SSD to EBS and disabling CPU options reduced the performance variability of our test suite. You can read more about the CPU options we tuned in our blog post Disable CPU options . Improvements in daily performance measurements through changing to EBS and disabling CPU options At the end of the project we hadn’t tested all of our original assumptions, but we had tested many of them. We still plan to test the remaining ones when time and priority allow: A dedicated instance means more stable performance Placement groups minimize network latency & variance Different availability zones have different hardware Through this process we also found that previously suggested solutions would not have solved our pains either: Just use i2 instances (better SSD) and be done with it (True in theory) Migrate everything to Google Cloud: Not tested! Conclusion of the tests In the end, there was still noise in the system, but we had reduced it sufficiently that our System Performance tests were now delivering real business value to the company. Every bit of noise bothers us, but at the end of the day we got to a level of repeatability in which test noise was no longer our most important performance related problem. As such, we stopped the all out effort on reducing system noise at this point. Adding in safeguards Before we fully moved on to other projects, we wanted to make sure to put up some safeguards for the future. We invested a lot of effort into reducing the noise, and we didn’t want to discover some day in the future that things had changed and our system was noisy again. Just like we want to detect changes in the performance of MongoDB software, we also want to detect changes in the reliability of our test platform. As part of our experiments, we built several canary benchmarks which give us insights into EC2 performance itself based on non-MongoDB performance tests. We decided to keep these tests and run them as part of every Evergreen task, together with the actual MongoDB benchmark that the task is running. If a MongoDB benchmark shows a regression, we can check whether a similar regression can be seen in any of the canary benchmarks. If yes, then we can just rerun the task and check again. If not, it's probably an actual MongoDB regression. If the canary benchmarks do show a performance drop, it is possible that the vendor may have deployed upgrades or configuration changes. Of course in the public cloud this can happen at arbitrary times, and possibly without the customers ever knowing. In our experience such changes are infrequently the cause for performance changes, but running a suite of "canary tests" gives us visibility into the day to day performance of the EC2 system components themselves, and thus increases confidence in our benchmark results. The canary tests give us an indication of whether we can trust a given set of test results, and enables us to clean up our data. Most importantly, we no longer need to debate whether it is possible to run performance benchmarks in a public cloud because we measure EC2 itself! Looking forward This work was completed over 1.5 years ago. Since that time it has provided the foundation that all our subsequent and future work has been built upon. It has led to 3 major trends: We use the results. Because we lowered the noise enough, we are able to regularly detect performance changes, diagnose them, and address them promptly. Additionally, developers are also "patch testing" their changes against System Performance now. That is, they are using System Performance to test the performance of their changes before they commit them, and address any performance changes before committing their code. Not only have we avoided regressions entering into our stable releases, in these cases we’ve avoided performance regressions ever making it into the code base (master branch). We’ve added more tests. Since we find our performance tests more useful, we naturally want more such tests and we have been adding more to our system. In addition to our core performance team, the core database developers also have been steadily adding more tests. As our system became more reliable and therefore more useful, the motivation to create tests across the entire organization has increased. We now have the entire organization contributing to the performance coverage. We’ve been able to extend the system. Given the value the company gets from the system, we’ve invested in extending the system. This includes adding more automation, new workload tools, and more logic for detecting when performance changes. None of that would have been feasible or worthwhile without lowering the noise of the System Performance tests to a reasonable level. We look forward to sharing more about these extensions in the future. Coda: Spectre/Meltdown As we came back from the 2018 New Years holidays, just like everyone else we got to read the news about the Meltdown and Spectre security vulnerabilities. Then, on January 4, all of our tests went red! Did someone make a bad commit into MongoDB, or is it possible that Amazon had deployed a security update with a performance impact? I turned out that one of our canary tests - the one sensitive to cpu and networking overhead - had caught the 30% drop too! Later, on Jan 13, performance recovered. Did Amazon undo the fixes? We believe so, but have not heard it confirmed. Performance drops on January 4th and bounces back on January 13th The single spike just before Jan 13 is a rerun of an old commit. This confirms the conclusion that the change in performance comes from the system, as running a Jan 11 build of MongoDB after Jan 13, will result in higher performance. Therefore the results depend on the date the test was run, rather than which commit was tested. As the world was scrambling to assess the performance implications of the necessary fixes, we could just sit back and watch them in our graphs. Getting on top of EC2 performance variations has truly paid off. Update: @ msw pointed us to this security bulletin , confirming that indeed one of the Intel microcode updates were reverted on January 13. If you found this interesting, be sure to tweet it . Also, don't forget to follow us for regular updates.

April 30, 2019
Engineering Blog

Repeatable Performance Tests: EC2 Instances are Neither Good Nor Bad

In an effort to improve repeatability, the MongoDB Performance team set out to reduce noise on several performance test suites run on EC2 instances. At the beginning of the project, it was unclear whether our goal of running repeatable performance tests in a public cloud was achievable. Instead of debating the issue based on assumptions and beliefs, we decided to measure noise itself and see if we could make configuration changes to minimize it. After thinking about our assumptions and the experiment setup , we began by recording data about our current setup. Investigate the status quo Our first experiment created a lot of data that we sifted through with many graphs. We started with graphs of simple statistics from repeated experiments: the minimum, median, and maximum result for each of our existing tests. The graphs allowed us to concisely see the range of variation per test, and which tests were noisy. Here is a representative graph for batched insert tests: High variance in the performance of batched insert tests In this graph you have two tests, each run three times at different thread levels (this is the integer at the end of the test name). The whiskers around the median, denote the minimum and maximum results (from the 25 point sample). Looking at this graph we observe that thread levels for these two tests aren't optimally configured. When running these two tests with 16 and 32 parallel threads, the threads have already saturated MongoDB, and the additional concurrency merely adds noise to the results. We noticed other configuration problems in other tests. We didn't touch the test configurations during this project, but later, after we had found a good EC2 configuration, we did revisit this issue and reviewed all test configurations to further minimize noise. Lesson learned: When you don't follow the disciplined approach of "Measure everything. Assume nothing." from the beginning, probably more than one thing has gone wrong. EC2 instances are neither good nor bad We looked at the first results in a number of different ways. One way showed us the results from all 25 trials in one view: Performance results do not correlate with the clusters they are run on As we examined the results, one very surprising conclusion immediately stood out from the above graph: There are neither good nor bad EC2 instances. When we originally built the system, someone had read somewhere on the internet that on EC2 you can get good and bad instances, noisy neighbours, and so on. There are even tools and scripts you can use to deploy a couple instances, run some smoke tests, and if performance results are poor, you shut them down and try again. Our system was in fact doing exactly that, and on a bad day would shut down and restart instances for 30 minutes before eventually giving up. (For a cluster with 15 expensive nodes, that can be an expensive 30 minutes!) Until now, this assumption had never been tested. If the assumption had been true, then on the above graph you would expect to see points 1-5 have roughly the same value, followed by a jump to point 6, after which points 6-10 again would have roughly the same value, and so on. However, this is not what we observe. There's a lot of variation in the test results, but ANOVA tests confirm that this variation does not correlate with the clusters the tests are run on. From this data, it appears that there is no evidence to support that there are good and bad EC2 instances. Note that it is completely possible that this result is specific to our system. For example, we use (and pay extra for) dedicated instances to reduce sources of noise in our benchmarks. It's quite possible that the issue with noisy neighbours is real, and doesn't happen to us because we don't have neighbours. The point is: measure your own systems; don't blindly believe stuff you read on the internet. Conclusion By measuring our system and analyzing the data in different ways we were able to disprove one of the assumptions we had made when building our system, namely that there are good and bad EC2 instances. As it turns out the variance in performance between different test runs does not correlate with the clusters the tests are run on. This is only one of three bigger experiments we performed in our quest to reduce variability in performance tests on EC2 instances. You can read more about the top level setup and results as well as how we found out that EBS instances are the stable option and CPU options are best disabled . If you found this interesting, be sure to tweet it. Also, don't forget to follow us for regular updates.

April 30, 2019
Engineering Blog

Repeatable Performance Tests: EBS Instances are the Stable Option

In an effort to improve repeatability, the MongoDB Performance team set out to reduce noise on several performance test suites run on EC2 instances. At the beginning of the project, it was unclear whether our goal of running repeatable performance tests in a public cloud was achievable. Instead of debating the issue based on assumptions and beliefs, we decided to measure noise itself and see if we could make configuration changes to minimize it. After thinking about our assumptions and the experiment setup , we began by recording data about our current setup and found no evidence of particularly good or bad EC2 instances . However, we found that the results of repeated tests had a high variance. Given our test data and our knowledge of the production system, we had observed that many of the noisiest tests did the most IO (being either sensitive to IO latency, or bandwidth). After performing the first baseline tests, we therefore decided to focus on IO performance through testing both AWS instance types and IO configuration on those instances. Investigate IO As we are explicitly focusing on IO in this step, we added an IO specific test (fio) to our system. This allows us to isolate the impact of IO noise to our existing benchmarks. The IO specific tests focus on: Operation latency Streaming bandwidth Random IOPs (IO per second) We look first at the IO specific results, and then our general MongoDB benchmarks. In the below graph, we are graphing the "noise" metric as a percentage computed from (max-min)/median and lower is better. c3.8xlarge with ephemeral storage is our baseline configuration which we were using in our production environment. i2.8xlarge shows best results with low noise on throughput and latency The IO tests show some very interesting results. The c3.8xlarge with EBS PIOPS shows less noise than the c3.8xlarge with its ephemeral disks. This was quite unexpected. In fact the c3.8xlarge with ephemeral storage (our existing configuration) is just about the worst choice. The i2.8xlarge looks best all around with low noise on throughput and latency. The c4.8xlarge shows higher latency noise than the c3.8xlarge. We would have expected any difference to favor the c4.8xlarge instances, as they are EBS optimized. After these promising results, we examined the results of our MongoDB benchmarks next. At the time that we did this work, MongoDB had two storage engines (wiredTiger and MMAPv1), with MMAPv1 being the default, but now deprecated, option. There were differences in the results between the two storage engines, but they shared a common trend. c3.8xlarge with PIOPS performs best with all results below 10% noise for the wiredTiger storage engine c3.8xlarge with PIOPS performs best with most results below 10% noise for the mmap storage engine There were no configurations that were best across the board. That said, there was a configuration with below 10% noise for all but 1 test: c3.8xlarge with EBS PIOPS. Interestingly, while i2 was the best for our focused IO tests, it was not for our actual tests. Valuable lessons learned: As far as repeatable results are concerned, the "local" SSDs we had been using performed worse compared to any other alternative we could have possibly chosen! Contrary to popular belief, when using Provisioned IOPS with EBS, the performance is both good in absolute terms, and very very stable! This is true for our IO tests and our general tests. The latency of disk requests does have more variability than the SSD alternatives, but the IOPS performance was super stable. For most of our tests, the latter is the important characteristic. The i2 instance family has a much higher performance SSD, and in fio tests showed almost zero variability. It also happens to be a very expensive instance type. However, while this instance type was indeed a great choice in theory, it turns out that our MongoDB test results were quite noisy. Upon further investigation, we learned that the noisy results were due to unstable performance of MongoDB itself. As i2.8xlarge has more RAM than c3.8xlarge, MongoDB on i2.8xlarge is able to hold much more dirty data in RAM. Flushing that much dirty data to disk was causing issues. Switching from ephemeral to EBS disks in production Based on the above results, we changed our production configuration to run on EBS disks instead of ephemeral SSD. (We were already running on c3.8xlarge instance types, which turned out to have the lowest noise in the above comparison, so decided to keep using those.) Performance becomes more stable when using EBS After running with the changes for a couple of weeks, you could clearly see how the day-to-day variation of test results decreased dramatically. This instantly made the entire System Performance project more useful to the development team and MongoDB as a whole. Conclusion Focusing on IO performance proved useful. As it turns out using Ephemeral (SSD) disks was just about the worst choice for our performance test. Instead, using Provisioned IOPS showed the most stable rate results. While i2 instances were the best in our non-MongoDB benchmark tests, they proved less than ideal in practice. This highlights quite clearly that you need to measure your actual system and assume nothing to get the best results. This is the second of three bigger experiments we performed in our quest to reduce variability in performance tests on EC2 instances. You can read more about the top level setup and results as well as how we found out that EC2 instances are neither good nor bad and that CPU options are best disabled . If you found this interesting, be sure to tweet it . Also, don't forget to follow us for regular updates.

April 30, 2019
Engineering Blog

Causal Guarantees Are Anything but Casual

Traditional databases, because they service reads and writes from a single node, naturally provide sequential ordering guarantees for read and write operations known as "causal consistency". A distributed system can provide these guarantees, but in order to do so, it must coordinate and order related events across all of its nodes, and limit how fast certain operations can complete. While causal consistency is easiest to understand when all data ordering guarantees are preserved – mimicking a vertically scaled database, even when the system encounters failures like node crashes or network partitions – there exist many legitimate consistency and durability tradeoffs that all systems need to make. MongoDB has been continuously running — and passing — Jepsen tests for years. Recently, we have been working with the Jepsen team to test for causal consistency. With their help, we learned how complex the failure modes become if you trade consistency guarantees for data throughput and recency. Causal consistency defined To maintain causal consistency, the following guarantees must be satisfied: To show how causal guarantees provide value to applications, let’s review an example where no causal ordering is enforced. The distributed system depicted in Diagram 1 is a replica set. This replica set has a primary (or leader) that accepts all incoming client writes and two secondaries (or followers) that replicate those writes. Either the primary or secondaries may service client reads. Diagram 1: Flow of Operations in a Replica Set without Enforced Causal Consistency The client application writes order 234 to the primary The primary responds that it has successfully applied the write Order 234 is replicated from the primary to one of the secondaries The client application reads the orders collection on a secondary The targeted secondary hasn’t seen order 234, so it responds with no results Order 234 is replicated from the primary to the other secondary The client makes an order through the application. The application writes the order to the primary and reads from a secondary. If the read operation targets a secondary that has yet to receive the replicated write, the application fails to read its own write. To ensure the application can read its own writes, we must extend the sequential ordering of operations on a single node to a global partial ordering for all nodes in the system. Implementation So far, this post has only discussed replica sets. However, to establish a global partial ordering of events across distributed systems, MongoDB has to account for not only replica sets but also sharded clusters, where each shard is a replica set that contains a partition of data. To establish a global partial ordering of events for replica sets and sharded clusters, MongoDB implemented a hybrid logical clock based on the Lamport logical clock . Every write or event that changes state in the system is assigned a time when it is applied to the primary. This time can be compared across all members of the deployment. Every participant in a sharded cluster, from drivers to query routers to data bearing nodes, must track and send their value of latest time in every message, allowing each node across shards to converge in their notion of the latest time. The primaries use the latest logical time to assign new times to subsequent writes. This creates a causal ordering for any series of related operations. A node can use the causal ordering to wait before performing a needed read or write, ensuring it happens after another operation. For a deeper dive on implementing cluster-wide causal consistency, review Misha Tyulenev’s talk . Let’s revisit our example from Diagram 1 but now enforcing causal consistency: Diagram 2: Flow of Operations in a Replica Set with Enforced Causal Consistency The client application writes order 234 to the primary The primary responds that it has successfully recorded the write at time T1 Order 234 is replicated from the primary to one of the secondaries The client application reads after time T1 on a secondary The targeted secondary hasn’t seen time T1, so must wait to respond Order 234 is replicated from the primary to the other secondary The secondary is able to respond with the contents of order 234 Write and read concerns Write concern and read concern are settings that can be applied to each operation, even those within a causally consistent set of operations. Write concern offers a choice between latency and durability. Read concern is a bit more subtle; it trades stricter isolation levels for recency. These settings affect the guarantees preserved during system failures Write concerns Write concern, or write acknowledgement, specifies the durability requirements of writes that must be met before returning a success message to the client. Write concern options are: Only a successful write with write concern majority is guaranteed to be durable for any system failure and never roll back. During a network partition, two nodes can temporarily believe they are the primary for the replica set, but only the true primary can see and commit to a majority of nodes. A write with write concern 1 can be successfully applied to either primary, whereas a write with write concern majority can succeed only on the true primary. However, this durability has a performance cost. Every write that uses write concern majority must wait for a majority of nodes to commit before the client receives a response from the primary. Only then is that thread freed up to do other application work. In MongoDB, you can choose to pay this cost as needed at an operation level. Read concern Read concern specifies the isolation level of reads. Read concern local returns locally committed data whereas read concern majority returns data that has been reflected in the majority committed snapshot that each node maintains. The majority committed snapshot contains data that has been committed to a majority of nodes and will never roll back in the face of a primary election. However, these reads can return stale data more often than read concern local . The majority snapshot may lack the most recent writes that have not yet been majority committed. This tradeoff could leave an application acting off old data. Just as with write concern, the appropriate read concern can be chosen at an operation level. Effect of write and read concerns With the rollout of causal consistency, we engaged the Jepsen team to help us explore how causal consistency interacts with read and write concerns. While we were all satisfied with the feature’s behavior under read/write concern majority, the Jepsen team did find some anomalies under other permutations. While less strict permutations may be more appropriate for some applications, it is important to understand the exact tradeoffs that apply to any database, distributed or not. Failure scenario examples Consider the behavior of different combinations of read and write concerns during a network partition where P1 has been partitioned from a majority of nodes and P2 has been elected as the new primary. Because P1 does not yet know it is no longer the primary, it can continue to accept writes. Once P1 is reconnected to a majority of nodes, all of its writes since the timeline diverged are rolled back. Diagram 3: Network Partition Timeline During this time, a client issues a causal sequence of operations as follows: At Time T1 perform a write W1 At Time T2 perform a read R1 The following four scenarios discuss the different read and write concern permutations and their tradeoffs. Read Concern majority with Write Concern majority Diagram 4: Read Concern majority with Write Concern majority The write W1 with write concern majority can only succeed when applied to a majority of nodes. This means that W1 must have executed on the true primary’s timeline and cannot be rolled back. The causal read R1 with read concern majority waits to see T1 majority committed before returning success. Because P1, partitioned from a majority of nodes, cannot progress its majority commit point, R1 can only succeed on the true primary’s timeline. R1 sees the definitive result of W1. All the causal guarantees are maintained when any failure occurs. All writes with write concern majority prevent unexpected behavior in failure scenarios at the cost of slower writes. For their most critical data, like orders and trades in a financial application, developers can trade performance for durability and consistency. Read Concern majority with Write Concern 1 Diagram 5: Read Concern majority with Write Concern 1 The write W1 using write concern 1 may succeed on either the P1 or P2 timeline even though a successful W1 on P1 will ultimately roll back. The causal read R1 with read concern majority waits to see T1 majority committed before returning success. Because P1, partitioned from a majority of nodes, cannot progress its majority commit point, R1 can only succeed on the true primary’s timeline. R1 sees the definitive result of W1. In the case where W1 executed on P1, the definitive result of W1 may be that the write did not commit. If R1 sees that W1 did not commit, then W1 will never commit. If R1 sees the successful W1, then W1 successfully committed on P2 and will never roll back. This combination of read and write concerns gives causal ordering without guaranteeing durability if failures occur. Consider a large scale platform that needs to quickly service its user base. Applications at scale need to manage high throughput traffic and benefit from low latency requests. When trying to keep up with load, longer response times on every request are not an option. The Twitter posting UI is a good analogy for this combination of read and write concern: The pending tweet, shown in grey, can be thought of as a write with write concern 1 . When we do a hard refresh, this workflow could leverage read concern majority to tell the user definitively whether the post persisted or not. Read concern majority helps the user safely recover. When we hard refresh and the post disappears, we can try again without the risk of double posting. If we see the post after a hard refresh at read concern majority , we know there is no risk that post ever disappearing. Read Concern local with Write Concern majority Diagram 6: Read Concern local with Write Concern majority The write W1 with write concern majority can only succeed when applied to a majority of nodes. This means that W1 must have executed on the true primary’s timeline and cannot be rolled back. With read concern local , the causal read R1 may occur on either the P1 or P2 timeline. The anomalies occur when R1 executes on P1 where the majority committed write is not seen, breaking the "read your own writes" guarantee. The monotonic reads guarantee is also not satisfied if multiple reads are sequentially executed across the P1 and P2 timelines. Causal guarantees are not maintained if failures occur. Consider a site with reviews for various products or services where all writes are performed with write concern majority and all reads are performed with read concern local . Reviews require a lot of user investment, and the application will likely want to confirm they are durable before continuing. Imagine writing a thoughtful two-paragraph review, only to have it disappear. With write concern majority , writes are never lost if they are successfully acknowledged. For a site with a read heavy workload, greater latency of rarer majority writes may not affect performance. With read concern loca , the client reads the most up-to-date reviews for the targeted node. However, the targeted node may be P1 and is not guaranteed to include the client's own writes that have been successfully made durable on the true timeline. In addition, the node’s most up-to-date reviews may include other reviewers' writes that have not yet been acknowledged and may be rolled back. Read Concern local with Write Concern 1 Diagram 7: Read Concern local with Write Concern 1 The combination of read concern local and write concern 1 has the same issues as the previous scenario but now the writes lack durability. The write W1 using write concern 1 may succeed on either the P1 or P2 timeline even though a successful W1 on P1 will ultimately roll back. With read concern local , the causal read R1 may occur on either the P1 or P2 timeline. The anomalies occur when W1 executes on P2 and R1 executes on P1 where the results of the write is not seen, breaking the "read your own writes" guarantee. The monotonic reads guarantee is also not satisfied if multiple reads are sequentially executed across the P1 and P2 timelines. Causal guarantees are not maintained if failures occur. Consider a sensor network of smart devices that does not handle failures encountered when reporting event data. These applications can have granular sensor data that drives high write throughput. The ordering of the sensor event data matters to track and analyze data trends over time. The micro view over a small period of time is not critical to the overall trend analysis, as packets can drop. Writing with write concern 1 may be appropriate to keep up with system throughput without strict durability requirements. For high throughput workloads and readers who prefer recency, the combination of read concern local and write concern 1 delivers the same behavior of primary only operations across all nodes in the system with the aforementioned tradeoffs. Conclusion Each operation in any system, distributed or not, makes a series of tradeoffs that affect application behavior. Working with the Jepsen team pushed us to consider the tradeoffs of read and write concerns when combined with causal consistency. MongoDB now recommends using both read concern majority and write concern majority to preserve causal guarantees and durability across all failure scenarios. However, other combinations, particularly read concern majority and write concern 1 , may be appropriate for some applications. Offering developers a range of read and write concerns enables them to precisely tune consistency, durability, and performance for their workloads. Our work with Jepsen has helped better characterize system behavior under different failure scenarios, enabling developers to make more informed choices on the guarantees and trade-offs available to them. If you found this interesting, be sure to tweet it . Also, don't forget to follow us for regular updates.

October 23, 2018
Engineering Blog

Ready to get Started with MongoDB Atlas?

Start Free