MonsterDB – Pipelines

pipelines

This section details the aggregation pipeline and the available operators.

Pipeline Stages

Pipelines allow us to chain together monsterDB commands to create various process that are executed by the optimiser or fuzzy matching system. the pipeline consists of a set of statements in a list and can be executed either at the Command Line (CLI) or via the API

CLI Usage: db.aCollection.aggregate([{...},{...}])
API Usage: cursor = aCollection.aggregate(List<Document>)

Each stage (document) in the pipeline will be executed in the order passed, the stages are responsible for processing as defined in the section below, but generally each stage will consume a set of documents and consume another smaller, larger or null see of documents. It is normal for the first stage to be a $match step which will identify the documents to be treated by the pipeline, however if this is omitted then the optimiser will assume the user wishes to inspect the whole collection.

Some stages will reduce the number of documents produced, for example “$group”, “$bucket”, “$filter”, “$limit”, “$skip” and some may increase the number of documents produced for example “$unwind”, “$match”. Some stages will consume the documents and not produce any output at all for example “$out”

Command Line Example:

db.fuzzyCollection.aggregate([{"$group":{"_id":"$LastName","avgSalary":{"$avg":"$salary"},"totalSalary":{"$sum":"$salary"},"totalPeople":{"$sum":1}}}])

API Example:

List pipeline = new ArrayList();
		Document step1 = new Document("$group", 
				new Document("_id", "$LastName")
				.append("avgSalary", new Document("$avg", "$salary"))
				.append("totalSalary", new Document("$sum", "$salary"))
				.append("totalPeople", new Document("$sum", 1)));
		
		pipeline.add(step1);
		System.out.println(pipeline);
		int t=0;
		for (Document d : c.aggregate(pipeline, new Document())) {
			System.out.println(d);
		
		}

Both CLI and API versions will yield the same results:

{"_id":"Haynes","avgSalary":1006.0,"totalSalary":2012.0,"totalPeople":2.0}
{"_id":"Smith","avgSalary":2013.5,"totalSalary":4027.0,"totalPeople":2.0}
Explanation:

The above pipeline has one single step $group which will group the documents in the collection by the fields defined in the _id, all other fields such as avgSalary, totalSalary and totalPeople are aggregates for each distinct value in _id – in this case LastName.

Lookup example

[{"$lookup":{"localField":"Currency","foreignField":"CURRID","as":"ExchangeRateToday","from":"Exchange"}},
 {"$group":{"_id":"$LastName","totalSalary":{"$sum":{"$multiply":["$salary","$ExchangeRateToday.0.Exchange"]}}}}]

The above example before aggregating the values by last name looks up the current exchange rate from another collection in the database to determine the salary based on the rate for the currency in the data.

  • “localField” – is the field in the source table – in this case the one in the aggregate command,
  • “from” – signifies the collection to look into
  • “foreignField” is the value in the looked-up collection that we should match localField against and
  • “as” is the name of the values returned.

You will see from the example it is possible to nest calcuations in the group, including looking into the list of ExchangeRateToday to pull out the Exchange value at the top of the list. In the example 0 signifies the head of the list.

 

Example Data Used

Examples used in this page:

Collection: People

{"FirstName":"Robert","LastName":"Smith","ROWID":3,"salary":2013,"startDate":"Mar 4, 2013 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"b7bbfd77-dce8-4a92-9b46-2ed2379eaf0a"}

{"FirstName":"Bob","LastName":"Smith","ROWID":4,"salary":2014,"startDate":"Mar 4, 2014 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"dc2b4007-1d93-43bd-ac13-0385ab0bd79e"}

{"FirstName":"Bob","LastName":"Haynes","ROWID":2,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"b8db302d-6ba1-429d-9b6a-f50c198845c3"}

{"FirstName":"Albi","LastName":"Haynes","ROWID":1,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"2c0bf70f-01d4-45a2-9d33-6b07357c5edf"}

Collection: Exchange

{"CURRID":"USD","Exchange":2,"_id":"01d3c614-8060-4e16-b049-ec7467f8c005"}

$match

Format to use in aggregate: {$match: {query}}
Follows the same usage pattern as the find(query) command in the collection definition. A query has the form:
{ aList.columnName: "a value", numericColumn: {$gt: 0}, textColumn: /regex/ }
...
{ $or: [ {...}, {...} ] }
...
{ $and: [ {...}, {...} ] }
The output produced will be a unique set of documents, a document will only be produced once. If the query is too specific no output will be produced.

$fuzzy

Format to use in aggregate: {$fuzzy: {Index: "indexname"}}
Whilst similar to the find statement the $fuzzy stage will not use the standard indexes, it will only use the fuzzy indexes, this will allow it to find records that appear to have some similarity with the search query. An example for this would be:
{Entity:{LegalName:"INTERAGROS VP, a.s."}}
The optimiser will pass the query to the fuzzy interpreter that will take into account any rules that apply to the filter above and return any records that match it:

Pipeline stage to match using Index Name CompanyNameIdx2:

{"$fuzzy":{"Index":"CompanyNameIdx2"}}
would yield 2 matches:
{"_id":"bb449f05-bebf-4a31-a44c-fe035354b051","Matches":[{"score":100.0,"acceptance":"Auto-Match","action":"EID","actionText":"Link","rule":{"canMatchSameSystem":true,"systemMatchType":0,"lowScore":80,"highScore":95,"actionText":""},"_id":"3074f531-b7bc-492b-90bd-1158d69eb634"}],"Count":1}

{“_id”:”3074f531-b7bc-492b-90bd-1158d69eb634″,”Matches”:[{“score”:100.0,”acceptance”:”Auto-Match”,”action”:”EID”,”actionText”:”Link”,”rule”:{“canMatchSameSystem”:true,”systemMatchType”:0,”lowScore”:80,”highScore”:95,”actionText”:””},”_id”:”bb449f05-bebf-4a31-a44c-fe035354b051″}],”Count”:1}

These results are indicating that there are two identical records matching the match rules and indicating to the user what the fuzzy match rules state should be the designated action to take with them, the _id field is the record that matched something and the _id field inside the Matches list is the _id field of the matched record or records.

$lookup

Format to use in aggregate: {$lookup: {"localField":"","foreignField":"","as":"","from":""}}}
This stage will look up a document in another (or the same) collection and include the matched document as a new object in the current record. In the event that the system fails to find a document in the looked up collection then the object will not be appended.
{"$lookup":{"localField":"Currency","foreignField":"CURRID","as":"ExchangeRateToday","from":"Exchange"}}
  • “localField” – is the field in the source table – in this case the one in the aggregate command,
  • “from” – signifies the collection to look into
  • “foreignField” is the value in the looked-up collection that we should match localField against and
  • “as” is the name of the values returned.

$bucket

Format to use in aggregate: {$bucket: {groupBy: expression, boundaries: [0,1,2...], default: "", output: expression}}
This stage will aggregate all incoming documents into buckets according to a numeric grouping. Example:
[{"$bucket":{"groupBy":"$salary","boundaries":[0,1000,2000,3000],"default":"other","output":{"count":{"$sum":1},"allnames":{"$push":"$FirstName"}}}}]
groupBy is the expression used to bucket the values, in this case it would be a single property of the document but in reality could be an expression too. boundaries is a list of numerics, the values are inclusive and a value such as 999 or 0 will appear together in this example, default, for values that fall outside the intended ranges in the boundaries will be assigned to the default bucket. This is useful for outliers. Output is the record that will be produced by each bucket, the _id field of with will be the value specified in the lower boundary. Each property of the output will be evaluated by the parser and thus can include expressions. Output from this example would look like this:
[{"$bucket":{"groupBy":"$salary","boundaries":[0,1000,2000,3000],"default":"other","output":{"count":{"$sum":1},"allnames":{"$push":"$FirstName"}}}}]
{"_id":1000.0,"count":0,"allnames":[]}
{"_id":2000.0,"count":2.0,"allnames":["Bob","Albi"]}
{"_id":3000.0,"count":2.0,"allnames":["Bob","Robert"]}

$filter

Format to use in aggregate: {$filter: {query}}
Filter is used beyond that of the $match stage, as each document may now be augmented with new values from lookups etc we can now filter on the current document values. As an example, if we add the following filter stage to the example in the bucket:
{"$filter":{"count":{"$gt":0}}}
The effect on the output would be as follows:
{"_id":2000.0,"count":2.0,"allnames":["Albi","Bob"]}
{“_id”:3000.0,”count”:2.0,”allnames”:[“Robert”,”Bob”]}

$group

Format to use in aggregate: {$group: {id: expression, field: expression, field: expression}}
Group is used to aggregate data by a set up dimensions in the document, the types of aggregation that is currently supported is:

$sum – summarise $avg – average (mean) $max – maximum numeric value $min – minimum numeric value $push – push value to a list (array) $first – first item in a list $last – last item in a list

Each of the operators above can be augmented by expressions. For example to sum all the salaries in our base currency at the current exchange rate, by last name:
[{"$group":{"_id":"$LastName","totalSalary":{"$sum":{"$multiply":["$salary","$exchangeRate"]}}}}]
The use of the _id column is key here as it defines the aggregate dimension to use, in the example above it is LastName, but it could also be an expression:
[{"$group":{"_id":{"year":{"$year":"$startDate"}},"totalSalary":{"$sum":"$salary"}}}]
Where the group by expression will be a year, extracted from the year of the startDate field. In our example data set:
{"_id":{"year":2013},"totalSalary":2013.0}
{"_id":{"year":2014},"totalSalary":2014.0}
{"_id":{"year":2012},"totalSalary":2012.0}
You could also decide not to group by an expression, for example:
[{"$group":{"minSalary":{"$min":"$salary"}}}]
Would produce the non-dimensional output:
{"minSalary":1006.0}

$skip

Format to use in aggregate: {$skip: number}
The skip stage will ignore a certain numeric amount of documents, for this to have any particular use, the user is recommended to ensure the collection data that skip is applied to is in a logical order, else the skipped records would be in the natural order of the index used to recall the data from the collection, which is constant providing you don’t change anything. An example is:
[{"$sort":{"ROWID":1}}, {"$skip":1}]
Using our data collection, this would produce the following output:
{"FirstName":"Bob","LastName":"Haynes","ROWID":2,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"16d39308-7f62-4414-a659-909623cb6f15"}

{"FirstName":"Robert","LastName":"Smith","ROWID":3,"salary":2013,"startDate":"Mar 4, 2013 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"2026e226-1165-427b-b784-ea450d04801f"}

{"FirstName":"Bob","LastName":"Smith","ROWID":4,"salary":2014,"startDate":"Mar 4, 2014 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"1d1f3717-28a2-4043-b0aa-26f6a1417d39"}
You will notice ROWID #1 is missing from our output.

$out

Format to use in aggregate: {$out: "name"}
Out is a largely terminal step in the aggregation pipeline as it does not produce any output. The usage of the out stage is to write the documents at the current stage to a collection in the current database.
[{"$group":{"_id":"$LastName","totalSalary":{"$sum":"$salary"},"totalPeople":{"$sum":1}}}, {"$out":"Summary"}]
Where the output to the next stage or the screen will be empty (size=0), however the table Summary will contain the following records:
{"_id":"Smith","totalSalary":4027,"totalPeople":2}
{"_id":"Haynes","totalSalary":2012,"totalPeople":2}

$sort

Format to use in aggregate: {$sort: {field: 1/-1, field: 1/-1....}}
Sort will determine the output of the values to the next step of the pipeline, it can be used to sort on numeric and non-numeric values in the database. Values can be combined together into the sort document to create a composite sort:
[{"$sort":{"LastName":1,"ROWID":-1}}]
This will sort ascending by lastname and descending by ROWID, yielding the following output:
{"FirstName":"Bob","LastName":"Haynes","ROWID":2,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"1225351c-b5b2-4e0a-93d3-fa51b5beb06f"}

{"FirstName":"Albi","LastName":"Haynes","ROWID":1,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"d3a77c1d-942a-4ff4-830b-98d4320c90eb"}

{"FirstName":"Bob","LastName":"Smith","ROWID":4,"salary":2014,"startDate":"Mar 4, 2014 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"b4c7c07f-7b9f-49a0-927e-96f5a6fb6adf"}

{"FirstName":"Robert","LastName":"Smith","ROWID":3,"salary":2013,"startDate":"Mar 4, 2013 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"b50ebba3-a08a-42e7-9ffb-04ff853847db"}

$limit

Format to use in aggregate: {$limit: number}
Limit is the opposite of skip, whereas it will limit the output to the first n values where n is an integer.
[{"$sort":{"ROWID":1}}, {"$limit":1}]
Using the same example as skip but switching to limit will produce the following:
{"FirstName":"Albi","LastName":"Haynes","ROWID":1,"salary":1006,"startDate":"Mar 4, 2012 12:00:00 AM","exchangeRate":2,"Currency":"USD","_id":"fd43e30d-b564-4f4c-b4ae-3d9602025dc6"}
Using skip and limit together: Depending on the order of the operations the results would be different. Skipping 1 and then limiting 1 will probably produce the 2nd record in the set, limiting 1 and then skipping 1 will produce and empty set as you would expect.

$unwind

Format to use in aggregate: {$unwind: "[fieldName.]fieldName"}

Whereas you may have an output the includes an array (list) within the document for example with the output from our earlier fuzzy search, the matches were produced in a list (Matches), if we want to extract the matches out and have pairs of records then we can use the unwind stage:

Original Pipeline:

[{"$fuzzy":{"Index":"CompanyNameIdx2"}}]

Produced:

{"_id":"99e94b88-802a-4b72-adb3-569a396f91fa","Matches":[],"Count":0}<
{"_id":"61542b5e-bcf8-4d7e-a5cf-317d7c4c1d8f","Matches":[],"Count":0}

{"_id":"bb449f05-bebf-4a31-a44c-fe035354b051","Matches":[{"score":100.0,"acceptance":"Auto-Match","action":"EID","actionText":"Link","rule":{"canMatchSameSystem":true,"systemMatchType":0,"lowScore":80,"highScore":95,"actionText":""},"_id":"3074f531-b7bc-492b-90bd-1158d69eb634"}],"Count":1}

{"_id":"3074f531-b7bc-492b-90bd-1158d69eb634","Matches":[{"score":100.0,"acceptance":"Auto-Match","action":"EID","actionText":"Link","rule":{"canMatchSameSystem":true,"systemMatchType":0,"lowScore":80,"highScore":95,"actionText":""},"_id":"bb449f05-bebf-4a31-a44c-fe035354b051"}],"Count":1}

By adding a filter and unwind to the fuzzy:

[{"$match":{"Entity.LegalForm.EntityLegalFormCode":"8888"}}, {"$fuzzy":{"Index":"CompanyNameIdx2"}}, {"$filter":{"Count":{"$gt":0}}}, {"$unwind":{"path":"Matches"}}]

Would now produce:

{"_id":"bb449f05-bebf-4a31-a44c-fe035354b051","Matches":{"score":100,"acceptance":"Auto-Match","action":"EID","actionText":"Link","rule":{"canMatchSameSystem":true,"systemMatchType":0,"lowScore":80,"highScore":95,"actionText":""},"_id":"3074f531-b7bc-492b-90bd-1158d69eb634"},"Count":1}

{"_id":"3074f531-b7bc-492b-90bd-1158d69eb634","Matches":{"score":100,"acceptance":"Auto-Match","action":"EID","actionText":"Link","rule":{"canMatchSameSystem":true,"systemMatchType":0,"lowScore":80,"highScore":95,"actionText":""},"_id":"bb449f05-bebf-4a31-a44c-fe035354b051"},"Count":1}

$analyze

Note, this step is generally a terminal stage in processing, it will of course trigger all earlier steps in the pipeline, and then discard the output. The analysis will run against the current data state and is often useful to analyse fuzzy matching indexes and document standardization that represents the comparable elements of the document (in the rules). It is recommended that you run this after any large load to determine the accuracy of the standardization and to see if there are any large index keys which could cause a slow matching process.
Format to use in aggregate, for index commands: 
{$analyze: {Index: "indexName".....}}

Standardization commands:
{$analyze: {Standardize: "indexName".....}}

INDEXING

Produce count of each fuzzy key entry with a count of records using this key value, counts of 1 will be omitted as it indicates only one record uses this key (currently)
db.fuzzy2.aggregate([{"$analyze": {Index: "CompanyNameIdx2"}}])
An example (abbreviated result is):
{"key":"GBTRADATANAL826TRADITIONALFUNDSPLCFCMACROGLOBALBONDFUND","count":3}
{"key":"GRIC300IKOS","count":3} 
{"key":"GRΔ300","count":4} 
{"key":"GRALF300ALPHA","count":10} 
{"key":"JPHSBC392HSBC","count":17} 
{"key":"ALLOTHERS","count":86535}
ALLOTHERS indicates the number of key values with a single document attached (singletons), in this index. All other values show the number of documents attached to a fuzzy index key. Inspect the records attached to a key supplied as InspectKey
db.fuzzy2.aggregate([{"$analyze": {Index: "CompanyNameIdx2", "InspectKey": "GRΑ300"}}])
{"Entity":{"LegalForm":{"OtherLegalForm":"LIMITED PARTNERSHIP","EntityLegalFormCode":"8888"},"RegistrationAuthority":{"RegistrationAuthorityEntityID":"38820905000","RegistrationAuthorityID":"RA888888","OtherRegistrationAuthorityID":"GENERAL ELECTRONIC COMMERCIAL REGISTRY (G.E.M.I.)"},"LegalAddress":{"PostalCode":"57008","FirstAddressLine":"2 KM PALEAS SIMAHIKIS ROAD IONIAS-OREOKASTROU 0","City":"THESSALONIKI","Country":"GR"},"TransliteratedOtherEntityNames":[],"HeadquartersAddress":{"City":"THESSALONIKI","PostalCode":"57008","FirstAddressLine":"2 KM PALEAS SIMAHIKIS ROAD IONIAS-OREOKASTROU 0","Country":"GR"},"LegalName":"Α. ΤΣΑΓΚΑΛΙΔΗ ΚΑΙ ΣΙΑ Ε.Ε.","EntityStatus":"ACTIVE","LegalJurisdiction":"GR"},"Registration":{"ValidationAuthority":{"ValidationAuthorityEntityID":"38820905000","ValidationAuthorityID":"RA888888","OtherValidationAuthorityID":"GENERAL ELECTRONIC COMMERCIAL REGISTRY (G.E.M.I.)"},"InitialRegistrationDate":"2016-05-24T00:00:00Z","ManagingLOU":"213800WAVVOPS85N2205","LastUpdateDate":"2016-05-24T14:21:19.517Z","ValidationSources":"FULLY_CORROBORATED","NextRenewalDate":"2017-05-24T00:00:00Z","RegistrationStatus":"LAPSED"},"LEI":"213800OUP6OZNYOU5Y85","Table":"GLEIF","_id":"d5ee5acc-d501-4e10-b0e5-7b7fddf9410c"}
Generate the key values for a document indicated by the _id in GenerateKeyForID
db.fuzzy2.aggregate([{"$analyze": {Index: "CompanyNameIdx2", "GenerateKeyForID": "d5ee5acc-d501-4e10-b0e5-7b7fddf9410c"}}])
Shows the fuzzy keys generated for a record with this _ID:
{"Key":"GRΑ300"}
{"Key":"GRΑΤΣΑΓΚΑΛΙΔΗΚΑΙΣΙΑΕ300"}

STANDARDIZATION

Produce count of each standardized concept  with a count of records using this value.
db.fuzzy2.aggregate([{"$analyze": {Standardize: "STD_1"}}])
This is an example (bad one), where not one of the concepts are not covering the entire document set:
{"concept":"Address","count":9797}
{"concept":"Country","count":9797}
{"concept":"Company_Name","count":9797}
{"concept":"RegCoNum","count":9797}
{"concept":"total_documents","count":47000}
And this is an example where the concepts are covering the whole document set.
{"concept":"Address","count":22000}
{"concept":"Country","count":22000}
{"concept":"Company_Name","count":22000}
{"concept":"RegCoNum","count":22000}
{"concept":"total_documents","count":22000}
To inspect further the match scoring for a record you can generate the standardized values that are used in comparison:
db.fuzzy2.aggregate([{"$analyze": {Standardize: "STD_1", "StandardizeForID": "d5ee5acc-d501-4e10-b0e5-7b7fddf9410c"}}])
To extract from the indexes the standardized values that have actually been stored:
db.fuzzy2.aggregate([{"$analyze": {Standardize: "STD_1", "InspectStandardized": "d5ee5acc-d501-4e10-b0e5-7b7fddf9410c"}}])
The result will include one the following advice :
{"Action":"db.fuzzy2.rebuildIndex(\"STD_1\")","Cause":"Standardised value was not found"} --- BAD

{"Action":"NONE","Cause":"Stored Standardised value was correct, score=100.0"} ---GOOD