In the last years, big data has become a useful paradigm for taking advantage of multiple sources to find relevant knowledge in real domains (such as the design of personalized marketing campaigns or helping to palliate the effects of several fatal diseases). Big data programming tools and methods have evolved over time from a MapReduce to a pipeline-based archetype. Concretely the use of pipelining schemes has become the most reliable way of processing and analyzing large amounts of data. To this end, this work introduces bdpar, a new highly customizable pipeline-based framework (using the OOP paradigm provided by R6 package) able to execute multiple preprocessing tasks over heterogeneous data sources. Moreover, to increase the flexibility and performance, bdpar provides helpful features such as (i) the definition of a novel object-based pipe operator (%>|%
), (ii) the ability to easily design and deploy new (and customized) input data parsers, tasks, and pipelines, (iii) only-once execution which avoids the execution of previously processed information (instances), guaranteeing that only new both input data and pipelines are executed, (iv) the capability to perform serial or parallel operations according to the user needs, (v) the inclusion of a debugging mechanism which allows users to check the status of each instance (and find possible errors) throughout the process.
Social networks and instant messaging applications have arguably become an essential part of the human experience. In fact, nowadays, more than 60% of the population from industrialized countries use these mechanisms to communicate or share information. This phenomenon emerged due to (i) the declining costs of computers and storage systems by a factor of more than 200 (Engineering 1984), (ii) an exponential increase in processing speed and computer hardware capabilities (Iansiti and Khansa 1995), (iii) the emergence of high-throughput and fully-available communication networks (Dorogovtsev and Mendes 2013), and (iv) certain human needs such as keeping interconnected and having permanent access to the data (Kabeer 2005).
This scenario has promoted an exponential growth in the amount of data generated and stored in the last decade. Concretely, the latest reports from 2018 showed that around 2.16EB (exabytes) of data are created every day (Domo-Data 2019; VCloud 2019), and trends are showing that the growth of available information is four times higher than the world economy (VCloud 2019). Indeed, 90% of the total world data have been created in the last two years alone (IBM 2019).
In addition to the availability of unlimited sources and tools to generate, exchange, and handle information, the lack of a standardized way of representing data has led to a massive increase in unstructured information. In fact, approximately 80% of the existing data is unstructured (IBM 2019; VCloud 2019). The data obtained from a single source are usually insufficient to carry out a suitable decision making-process. However, the ability to take advantage of the combination of data from multiple (and unstructured) sources requires the execution of preprocessing operations that guarantee a unified data format. The need for facilitating the management and exploitation of vast amounts of heterogeneous data (in terms of data types and formats) within a reasonable elapsed time and cost-effective manner led to the emergence of the Big Data era (Mervis 2012; Labrinidis and Jagadish 2012).
Big Data is an abstract concept used to refer to the use of new programming paradigms able to handle large volumes of information and the execution of data mining tasks by taking advantage of parallel programming schemes over large computer clusters (Brown et al. 2011; IBM et al. 2011). In this context, MapReduce (Miner and Shook 2012; Wu et al. 2014) is the most popular programming model to develop, execute and deploy Big Data analyses on large clusters. However, its batch-processing nature forces uploading data to the system (cluster) every time is analyzed, even when the input data has been previously utilized. This requirement (i) makes this programming paradigm unsuitable when leading with real-time streaming sources and (ii) avoids achieving full use of the computational capabilities and resources since clusters are idle while the data is being loaded. In order to solve these limitations, the utilization of pipelining schemes for big data processing was recently introduced by (Di Tommaso 2019). This concept (extrapolated from the electronic domain) is focused on dividing the whole data analysis process into a set of computationally simple tasks (O’Donovan et al. 2015) whereby the required information for each task is handled exclusively, which avoids the (pre)loading of unnecessary information. This advantage has prompted the emergence of multiple enterprises offering cloud pipeline-based data analysis services such as BDB Solutions for Big Data (Solutions 2019), AWS Amazon Data Pipeline (Amazon 2019) or Google Cloud Dataflow (Google 2019). These services allow users to build highly customized pipelines using a simple graphical interface, even if their technical skills are basic. Despite the great advantages of these services, their cloud-oriented nature causes customers to be reluctant to use them due to (i) the full control of the pipeline by the company offering the service, (ii) data privacy and security concerns (since information is executed in a foreign infrastructure), and (iii) the difficulty of assessing and calculating the cost of computational resources required to process data through the defined pipeline.
In order to cope with these problems, several customers decided to change the third-party cloud-based service to a proprietary solution by designing and implementing their own pipelining tools. Meanwhile, multiple open-source offline Big Data Pipeline frameworks emerged from the academic community to make this new paradigm available to everybody (Di Tommaso 2019). However, as can be observed from the list shown in (Di Tommaso 2019), despite the great number of available solutions, the majority are developed using Java language (>90%) while only a few belong to the R ecosystems. Among these, two packages should be mentioned, repo (Napolitano 2020) and drake (Landau 2018). The former one is a data-centered pipeline focused on solving bioinformatics data problems (specific-purpose application). Conversely, the latter is a generic pipeline tool that provides similar functionality to the GNU Make utility. As can be seen, despite both applications are focused on the same concept (pipelines), the target, implementation schema, and provided functionalities are quite divergent. In addition, we found some important issues that are not addressed by the actual pipelines tools such as (i) lack of a pure object-oriented (OO) implementation to facilitate the use and reduce the learning curve for people coming from object-oriented environments, (ii) the absence of an application based on the pipelining concept used by the well-known magrittr package (Bache and Wickham 2020) (focused on UNIX pipes), (iii) the use of a black-box implementation which hampers users to easily trace and debug both code and the intermediate results.
This scenario motivated us to design and implement bdpar, a framework capable of unifying and preprocessing heterogeneous data through the development and execution of customizable pipelines. To this end, our package allows automatizing the management of a large amount of information by segmenting data into a sequence of simple and indivisible tasks (divide and conquer paradigm). Specifically, bdpar allows to (i) use or develop content extractors (such as SMS or email parsers), (ii) use and implement new preprocessing tasks (pipes), (iii) define customization pipelines (set of tasks) to achieve the desired (structured) output, (iv) visualize the intermediate results achieved by each instance after being processed by the tasks comprising the pipeline (white-box implementation), (v) prevent the re-execution of previously computed instances and tasks, and finally (vi) execute the pipeline following both a sequential or parallel paradigm.
This paper provides a full description of the main functionalities and resources of the bdpar package. The current version is 3.0.1, and an updated list (with the whole collection of resources) is available in the vignette document and the reference manual. The following section provides a complete description of the package structure and functionalities. Then, the use of the package is described, and finally, an illustrative case study is provided.
In order to exploit the main advantages and strengths of the
object-oriented paradigm (such as maintainability, modularity, or
inheritance), the bdpar package was fully developed using R6
classes
(package R6 (Chang 2019b)).
Particularly, bdpar was implemented using R6 classes due to its high
performance and ease of use when compared with other alternatives (such
as S3 or S4) ((Chang 2019a; Wickham 2019)). From an operational point of view, R6
classes implemented in bdpar package are divided into three different
categories: (i) data extraction functionalities; (ii) pipe-based
operations; and finally (iii) bdpar framework configuration utilities.
The first category (data extraction methods) comprises all methods responsible for automatically detect the format and parse contents according to the inner structure of data gathered from input sources. The second category encapsulates the functionality of extracting features from the parsed information. By default, the bdpar framework provides a complete data preprocessing flow comprising 18 different tasks. Additionally, bdpar allows for the easy creation of new customized data flows by combining multiple tasks (object-based pipes). Finally, the third category of methods allows handling the configuration parameters needed for the proper operation of both the bdpar framework and some tasks using third-party functions (such as credentials for rtweet (Kearney 2019) or tuber (Sood 2019)).
In order to improve the readability of the code and facilitate the
comprehension of each implemented class, a naming convention was
adopted. Methods included in the data extraction category are labeled
using Extractor
as a prefix, followed by the type (or structure) of
the input source (e.g., ExtractorEml
and ExtractorSms
methods are
able to parse text contents from emails and SMS, respectively). Finally,
pipe-based functionalities are named using the operation name followed
by the suffix Pipe
(e.g., ToLowerPipe
and FindHashtagPipe
are
tasks designed to convert text characters to lowercase and detect
Twitter hashtags from textual contents, respectively).
As stated before, the bdpar framework is focused on designing,
implementing, and deploying customized processing flows for the big data
domain. In order to handle the information obtained from the input
sources, the package uses a specific structure called Instance
, which
is responsible for storing the properties extracted by each pipe
comprising the processing flow. To provide an insightful view of our
bdpar framework, Figure 1 provides a graphical
representation of its inner operation, which is divided into two main
stages: (i) data loading and (ii) pipeline executing.
As shown in Figure 1, the first stage comprises the loading
of the required extractors according to the type of input data. By
default, bdpar provides four different types of extractors: (i)
ExtractorSMS
is able to extract the textual contents exchanged through
Short Message Service (SMS); (ii) ExtractorTwtid
is capable of
obtaining the text from Twitter entries (tweets); (iii) ExtractorEML
can be used to gathering raw content from the body of email messages,
and finally (iv) ExtractorYtbid
allows extracting the comments
published on the YouTube platform. Additionally, to increase the
compatibility of bdpar with other data formats, the framework allows
for the easy design and deployment of new customized extractors (by
using a simple OOP inheritance relation).
Once the content is successfully extracted from the raw sources, the
second stage is automatically initiated by bdpar. This stage comprises
the execution of two steps: (i) pipeline handling; and (ii) output
generating. The former is in charge of performing unified data
processing by executing a specific set of pipes (also named pipeline)
over the previously extracted content. It should be noted that each pipe
included in the pipeline is represented as an object (inherited from
GenericPipe
class) responsible for performing a specific operation
(task) over the input data. The second step (called output generating)
transforms the preprocessed data into a specific output format (e.g.,
into a CSV structure). Moreover, bdpar allows users to develop new
specific output-generation methods to achieve the desired output.
As can be realized from the pipeline handling stage shown in Figure 1, the pipe-based structure provides great flexibility and versatility to users since it allows users to easily (i) modify existing pipelines by adding or removing pipes, (ii) develop new pipes implementing additional tasks, or (iii) design and deploy new customized pipelines. To assist users in the creation and deployment of new pipelines, bdpar provides a set of 18 combinable pipes implementing basic preprocessing tasks for text sources. As can be seen in Table 1, tasks included in bdpar are divided into two different categories: (i) transformer tasks which are able to perform operations that successively alter the original content (such as lowercase conversion or emoticon finding), and (ii) maintainers which are responsible for executing operations that do not affect the current content (such as storing the extension of the input data).
Pipe name | Pipe type | Name of computed property | Description |
---|---|---|---|
GuessDatePipe |
Transformer | "date" | Obtains the date and time based on the type and structure of the input data. |
File2Pipe |
Transformer | "source" | Obtains the source based on the type and structure of the input data. |
FindUserNamePipe |
Transformer | "userName" | Detects and extracts usernames from textual sources. |
FindHashtagPipe |
Transformer | "hashtag" | Detects and obtains hashtags from input data. |
FindUrlPipe |
Transformer | "URLs" | Uses regular expressions to find URLs in text. |
FindEmoticonPipe | Transformer | "emoticon" | Identifies and extracts emoticons from textual sources. |
FindEmojiPipe |
Transformer | "Emojis" | Transforms emojis to its textual representation. |
GuessLanguagePipe |
Transformer | "language" | Tries to guess the language of a specific text. |
ContractionPipe |
Transformer | "contractions" | Transforms previously detected contractions. |
AbbreviationPipe |
Transformer | "abbreviation" | Expands detected abbreviations. |
SlangPipe |
Transformer | "langpropname" | Identifies slang words to its corresponding formal speech. |
ToLowerCasePipe |
Transformer | – | Converts the input source to lowercase characters. |
InterjectionPipe |
Transformer | "interjection" | Detects and extracts interjections from textual sources. |
StopWordPipe |
Transformer | "stopWord" | Recognizes and obtains stop words from textual sources. |
TargetAssigningPipe |
Maintainer | "target" | Identifies the target class of the data. |
StoreFileExtPipe |
Maintainer | "extension" | Guess the extension of the input data. |
MeasureLengthPipe |
Maintainer | "length" | Computes the length of a given text. |
TeeCSVPipe |
Maintainer | – | Transforms the final result into a CSV format file. |
Additionally, each pipe provides a property name field where the
computed property will be stored. To increase flexibility, property
names can be specified by users or leave it by default (see names
described in Table 1). Finally, pipes definition in
Table 1 were sorted to match the execution order defined in
the pipeline included by default in bdpar (named as DefaultPipeline
)
Moreover, to increase the reliability of the pipeline, bdpar allows
defining the execution order of each task comprising the specified
pipeline. During the definition of a pipeline, we should specifically
take into account the possible interdependence between pipes (e.g.,
language-dependent tasks should be executed after GuessLanguagePipe
).
To solve this situation and ensure the proper creation and execution of
the pipelining process, bdpar provides a pipe-orchestration system.
This mechanism is automatically invoked when the pipeline is executed
and traverses all tasks comprising the pipeline to evaluate two types of
interdependencies: (i) always-before (or ‘a priori dependence’) and (ii)
not-after (or ‘a posteriori dependence’). The first constraint is used
when a specific pipe requires the previous execution of other tasks to
ensure its proper operation (for instance, AbbreviationPipe
needs to
know the text language, so it should not be executed before
GuessLanguagePipe
). Conversely, the second type of restriction is used
to indicate the tasks that cannot be started after the execution of the
current one (for instance, the recognition of contractions should not be
run after changing text characters to lowercase or removing punctuation
marks). Both restrictions are automatically managed through the
checkCompatibility
method included in the Instance
class.
Furthermore, in order to ensure the proper execution of tasks
implemented through R6
classes within the pipeline, bdpar implements
an object-oriented customized operator (denoted as %>|%
) inspired by
the implementation of the primitive forward-pipe operator (%>%
)
provided by the magrittr package (Bache and Wickham 2020). Particularly, the
execution of this new operator implies some inner operations such as (i)
discarding an Instance (left-side operand) whenever an error occurs
during the data processing flow, (ii) automatically manage pipes
dependencies between pipes, (iii) simplifying the invocation of the
associated pipe task (right-side operand) by hiding its explicit call,
(iv) facilitate debugging issues by showing log messages with different
levels of granularity, (v) the ability to display the intermediate
computation results of each instance throughout the whole preprocessing
flow and (vi) the capability to avoid the re-execution of previously
processed pipelines. To this end, the operator transparently calls the
pipe
method defined in the pipe object. These functionalities allow
improving the processing capabilities (in terms of speed, performance,
and usability) of the application by preventing the problems derived
from the (potential) existence of errors during the pipelining process
and shorten pipelining definition by taking advantage of the customized
operator capabilities. To increase the customization capabilities,
bdpar allows easy development and deployment of new user-defined
pipelines. To ensure full compatibility of new user-defined pipelines
bdpar provides a reference class called GenericPipeline
.
Additionally, to simplify the use of the framework, bdpar provides a
predefined pipeline (named DefaultPipeline
) containing all the pipes
included in Table 1.
Finally, once all Instance
objects are processed, the output
generation stage starts. As can be seen from Figure 1, this
stage is responsible for storing the results achieved after executing
the pipeline process over each (valid) Instance
. Although this stage
allows the use of customized storage and output-representation methods
(implemented by user), bdpar provides two methods able to (i) save the
achieved output into an external CSV file (using TeeCSVPipe
pipe) or
(ii) internally store in memory a set of preprocessed Instance
objects
(default output).
In order to exemplify the structure and operation of an object-based pipeline in bdpar, we have included below a code snippet comprising 13 different text processing tasks (implemented as pipe objects).
%>|%
instance $new() %>|% StoreFileExtPipe$new() %>|%
TargetAssigningPipe$new() %>|% File2Pipe$new() %>|%
GuessDatePipe$new("length_before_cleaning_text") %>|%
MeasureLengthPipe$new() %>|% FindEmojiPipe$new() %>|% GuessLanguagePipe$new() %>|%
FindUrlPipe$new() %>|% ToLowerCasePipe$new() %>|%
SlangPipe$new() %>|% StopWordPipe$new() %>|%
InterjectionPipe$new("length_after_cleaning_text") %>|% TeeCSVPipe$new() MeasureLengthPipe
To facilitate the understanding of the pipelining process, the code
included above assumes that each input data has been successfully loaded
and stored in an Instance
object (denoted as instance
). As can be
seen from the code snippet, each instance
is processed through all the
tasks comprising this pipeline. Specifically, the first 13 ones perform
different preprocessing operations over each instance
, while the
latest one stores the achieved results into a CSV file.
The package can be installed and attached as described in the code
included below (please refer to the README
file to access the latest
and development versions).
install.packages("bdpar")
library(bdpar)
Please note that the core functionalities of bdpar require the
previous installation of six R packages (described in the Imports
field included in the DESCRIPTION
file). In addition, some optional
tasks (mainly belonging to specific data-processing pipes) used certain
packages (indicated on the Suggest
field included in the DESCRIPTION
file) and should also be installed in order to ensure its proper
operation. It should be taken into account that in case of needing all
the dependencies, the argument dependencies = TRUE
should be included
in the command install.packages
.
In order to guarantee a high level of flexibility, bdpar can be easily executed by using two different ways (i) following an OOP paradigm or (ii) using a classical function call approach. Below is include a code snippet describing both scenarios.
<- Bdpar$new()
bdpar $execute(path, extractors= ExtractorFactory$new(),
bdparpipeline= DefaultPipeline$new(), cache= TRUE,
verbose= FALSE, summary= FALSE)
a) Executing bdpar using OOP paradigm
<- line(path, extractors = ExtractorFactory$new(),
output pipeline= DefaultPipeline$new(), cache= TRUE,
verbose= FALSE, summary= FALSE)
b) Executing bdpar following a function-based approach.
As can be depicted, both execution methods require the same six
arguments since runPipeline
is a wrapper function which encapsulates
bdpar execution using the OOP paradigm. The first argument is
mandatory since it is used to specify the directory or file(s) path
where the raw input data is located. The second parameter indicates the
extractors required to parse the input sources. If not defined, bdpar
automatically invokes the default ExtractorFactory$new()
object which
initializes the four extractors provided by bdpar framework.
Following, the third argument is used to determine the sequence of
preprocessing tasks that should be executed (pipeline) to achieve the
desired output (featured dataset). If the argument is not assigned,
bdpar executes DefaultPipeline$new()
object which implements an
error-safe pipeline comprised of 18 tasks described in
Table 1. The fourth one is used to enable (or disable)
bdpar not-re-execution functionality (defined as N-RE
).
Particularly, this feature is able to detect which instances, tasks, and
even pipelines were previously executed with a view to avoiding their
re-execution. Moreover, the penultimate argument is used to indicate (if
needed) the generation of a log output showing different levels of
granularity (DEBUG
, INFO
, WARN
, ERROR
, FATAL
). It should be
noted that DEBUG
level allows displaying the intermediate results
achieved by each instance after being processed by the tasks comprising
the pipeline. This is very useful to detect the location of possible
errors. Finally, the latter argument allows showing (if needed) a
detailed summary of all the operations and tasks performed during the
pipeline execution.
As previously stated, the design of the software architecture of bdpar is focused on facilitating the customization of any stage of the process (data loader and pipeline executor). Specifically, bdpar allows users to: (i) defining new types of input data parses, (ii) creating new pipes, and (iii) implementing and deploying new preprocessing tasks.
Regarding the first aspect, the development of new content parsers
involves two stages: (i) the implementation of a customized extractor by
overriding solely the methods of the Instance
class that are necessary
to load the input and (ii) the registration of the created extractor so
it can be loaded by the bdpar framework. Two code fragments to detail
the development and registration of a new customized parsers is included
below.
<- R6::R6Class(
ExtractorImage classname = "ExtractorImage",
inherit = Instance,
public = list(
initialize = function(path) {
$initialize(path)
super
},obtainSource = function() {
<- imager::load.image(super$getPath())
source $setSource(source)
super$setData(source)
super } ) )
a) Implementation of new ExtractorImage
parser.
<- ExtractorFactory$new()
extractors $registerExtractor(extension= c("jpeg","png"),
extractorsextractor= ExtractorImage)
b) Dynamic extractor registration operation in bdpar.
As can be seen, the first code snippet describes the formal structure of
a new extractor. Particularly, ExtractorImage
is able to load an image
into an Instance object. To accomplish this task, the obtainSource
method loads (by invoking load.image()
method) an image from the file
path received as a parameter of the class constructor
(super$getPath()
). Then, the loaded image is stored in the source
variable of the Instance
superclass (by invoking super$setSource()
method) and is assigned to the data variable by calling to
super$setData()
method. The data field is used to store the result of
each task comprising the pipeline.
Moreover, the second fragment of code exemplifies the registration of
the previously created extractor in bdpar framework. As can be
depicted, this operation is performed by a simple call to the
registerExtractor
method included in ExtractorFactory
class. Due to
the one-to-one dependency between each extractor and the different input
formats, bdpar requires the definition of a specific extension (or set
of extensions) to discern which type of extractor should execute. Also,
ExtractorFactory
provides two additional methods (i)
getAllExtractors
, which shows all registered extractors in bdpar
framework and (ii) removeExtractor
, which deletes a specific data
extractor. Finally, to avoid parsing errors, unsupported input contents
by registered extractors are automatically ignored by bdpar.
For the creation and deployment of new preprocessing tasks, bdpar
provides an abstract class named GenericPipe
. This type of class is
very common in OOP to ensure all subclasses (pipes) follow the same
structure and implement the methods defined in the superclass
(GenericPipe
). Particularly, GenericPipe
defines two main methods
that should be included in each subclass: (i) initialize
and (ii)
pipe
. The former includes three optional parameters that are
propertyName
, which refers to the specific name to the output value
computed in the task, alwaysBeforeDeps
, and notAfterDeps
, which
handles two types of dependencies between pipes ("always-before" and
"not-after", respectively). Finally, the pipe
method is used to
implement the behavior of the new task. Below we include a code snippet
exemplifying how to develop a basic image-preprocess pipeline.
Concretely, we design three pipes: (i) Image2Pipe
responsible for
invoking the obtainSource
method provided in the ExtractorImage
parser, (ii) ImageCroppingPipe
in charge of halving the image, and
(iii) ImageRotatePipe
, which rotates the image 30 degrees clockwise.
<- R6::R6Class(
Image2Pipe name = "Image2Pipe",
inherit = GenericPipe,
public = list(
initialize = function(propertyName= "",
alwaysBeforeDeps= list(),
notAfterDeps= list()) {
$initialize(propertyName, alwaysBeforeDeps, notAfterDeps)
super
},pipe = function(instance) {
$obtainSource()
instance
instance
} ) )
<- R6::R6Class(
ImageCroppingPipe "ImageCroppingPipe",
inherit = GenericPipe,
public = list(
initialize = function(propertyName= "",
alwaysBeforeDeps= list("Image2Pipe"),
notAfterDeps= list()) {
$initialize(propertyName, alwaysBeforeDeps, notAfterDeps)
super
},pipe = function(instance) {
<- instance$getData()
data <- imager::imsub(data, x > height/2)
data $setData(data)
instance
instance
} ) )
<- R6::R6Class(
ImageResizePipe "ImageResizePipe",
inherit = GenericPipe,
public = list(
initialize = function(propertyName= "",
alwaysBeforeDeps= list("Image2Pipe"),
notAfterDeps= list()) {
$initialize(propertyName, alwaysBeforeDeps, notAfterDeps)
super
},pipe = function(instance) {
<- instance$getData()
data <- imager::imrotate(data, 30)
data $setData(data)
instance
instance } ) )
As can be seen, the Image2Pipe
class stores the loaded image into a
new instance. Following, ImageCroppingPipe
and ImageResizePipe
class
apply different image manipulation functions (imager::imsub
,
imager::imrotate
) to halve and rotate the images, respectively.
Following the result, of each pipe is stored into a specific field of
the Instance
object (by calling the instance$setData()
method). To
ensure proper operation of both tasks, image content should be
previously loaded into an instance object (by invoking Image2Pipe
associated task). To this end, a priori dependence with Image2Pipe
has
been defined in the initialize
method of both pipes. As mentioned
before, to facilitate the development and execution of pipes,
dependencies between pipes are automatically managed by the
object-oriented pipe operator (%>|%). Finally, in order to develop
robust pipelines, instance
objects should be invalidated when an error
occurs, or the requirements are not satisfied (such as the storage of
empty data or non-identification of textual language).
Finally, bdpar allows users to customize existing pipelines and
develop new ones from scratch. In order to motivate the usage of bdpar
regardless of user programming skills, the framework allows to manually
or dynamically design new pipelines. The first method requires the
creation of a new class (inheriting from GenericPipeline
) which
implements the execute
method defined in the parent class. Moreover,
to ensure proper management of (possible) execution errors (such as
invalidated instances), a try-catch function should be included.
Additionally, bdpar allows customizing the log messages (if needed) by
calling the bdpar.log()
function. Below we include an example showing
how a customized pipeline is manually created.
<- R6::R6Class(
TestPipeline classname = "TestPipeline",
inherit = GenericPipeline,
public = list(
initialize = function() {},
execute = function(instance) {
message("[TestPipeline][execute][Info] ", instance$getPath())
tryCatch(
%>|% Image2Pipe$new() %>|%
instance $new() %>|% ImageResizePipe$new(),
ImageCroppingPipeerror = function(e) {
bdpar.log(message = paste0(instance$getPath()," :", paste(e)),
level= "ERROR",
className= class(self)[1],
methodName= "execute")
$invalidate()
instance
} )return(instance)
} ) )
On the other hand, the dynamic method allows the creation of custom
pipelines by simply indicating a list containing the pipe objects to be
used. To achieve a higher level of flexibility dynamic, the model
provides two ways of defining pipelines: (i) during the object
instantiation or (ii) by calling the add
function. A simple example
describing how the previous pipeline is created following the dynamic
method is included below.
<- DynamicPipeline$new(pipeline= list(Image2Pipe$new(),
pipeline $new(),
ImageCroppingPipe$new())) ImageResizePipe
a) Pipeline definition during object instantiation.
<- DynamicPipeline$new()
pipeline $add(list(Image2Pipe$new(), ImageCroppingPipe$new(),
pipeline$new())) ImageResizePipe
b) Pipeline creation using the add method.
Additionally, the dynamic method provides six methods able to extend the
pipeline customization capabilities: (i) add(pipe, pos=NULL)
, which
adds new pipe object(s) to the pipeline flow at a certain position (or
at the end if not defined), (ii) removeByPos(pos)
, which removes a
pipe object at a given position, (iii) removeByPipe(pipe.name)
responsible for erasing a pipe
object by name, (iv) removeAll()
capable of releasing all pipe objects from the pipeline, (v) get()
returning a list containing the pipe objects comprising the pipeline,
and finally, (vi) print()
which displays the pipes comprised in the
pipeline.
As can be realized from both methods, the manual definition of pipelines
allows users to have greater control and insight over the pipeline (such
as personalizing error-handling methods or the inclusion of new
user-defined object-oriented pipeline operators). Conversely, the
dynamic mode enables users to define optimal pipelines without expert
knowledge of R6
and OOP concepts.
bdpar.Options
included in bdpar allows managing configuration
parameters to customize the behavior of available tasks and indicate
parameters needed for the proper operation of pipes and/or content
extractors (such as path locations for slang dictionaries or credentials
required by Twitter or Youtube APIs, respectively). Moreover, to easily
search and access the configuration, bdpar.Options
stores parameters
following a key-value pair structure. As can be deducted, the key
parameter is used to uniquely identify a configuration entry. In order
to facilitate the management of configuration parameters,
bdpar.Options
provides four main methods: (i)
bdpar.Options$add(key, value)
, which adds a new configuration entry,
(ii) bdpar.Options$set(key, value)
used to modify the value of an
existing configuration parameter, (iii) bdpar.Options$remove(key)
,
which removes an entry matching a specific name, and finally, (iv)
bdpar.Options$reset()
used to restore bdpar.Options
to its initial
state (default options). Table 2 describes the configuration
options included in bdpar.
Type | Key | Assigned by default | Description |
---|---|---|---|
API credentials | twitter.consumer.key | x | Set of keys need to connect to Twitter and Youtube API |
twitter.consumer.secret | x | ||
twitter.access.token | x | ||
twitter.access.token.secret | x | ||
youtube.app.id | x | ||
youtube.app.password | x | ||
API cache | cache.youtube.path | x | Path to temporary place extracted data |
cache.twitter.path | x | ||
Pipe parameters | teeCSVPipe.output.path | \(\surd\) | Defines the out-put file path for TeeCSVPipe |
Extractor options | extractorEML.mpaPartSelected | \(\surd\) | Indicates the content-type to parse on multi-part emails |
N-RE handler | cache.folder | \(\surd\) | Indicates the path to store the intermediate results |
Parallel settings | numCores | \(\surd\) | Selects the number of cores used to execute the pipelines |
Resource files | resources.abbreviations.path | \(\surd\) | Location for the different language dictionaries (slang, contractions, \(\ldots\)) |
resources.contractions.path | \(\surd\) | ||
resources.interjections.path | \(\surd\) | ||
resources.slangs.path | \(\surd\) | ||
resources.stopwords.path |
As can be seen from Table 2, configuration options are divided into seven categories: (i) API credentials, (ii) API cache, (iii) pipe parameters (iv) Extractor options, (v) instance cache handler, (vi) parallel settings, and (vii) resource files. It is important to take into account that values for API credentials are not provided by default due to the inexistence of publicly available access keys for both Youtube and Twitter (only for private use with prior approval). Following, the optional API cache configuration entries are responsible for designating temporal locations to store information obtained after executing the content extractors. Defining cache paths API ensures that duplicated sources are executed only once (avoids parsing duplicated inputs).
Moreover, pipe parameters and extractor options allow specifying
configuration values needed to guarantee the proper execution of pipes
and extractors, respectively. Particularly, default
"extractorEML.mpaPartSelected" entry allows defining which
content-type (text/plain or text/html) should be extracted in multipart
emails while "teeCSVPipe.output.path" indicates the location to store
the CSV file generated by TeeCSVPipe
pipe.
In addition, the not-re-execution handler allows defining the path to
store the information required for the proper operation of the
not-re-execution functionality. Despite this, the feature is very useful
to reduce both unnecessary computation costs and time consumption that
requires extra storage space. Therefore, bdpar allows the deletion of
the intermediate results by invoking the specific
bdpar.Options$cleanCache()
method.
Following, parallel settings category is used to define the
configuration values to handle parallelization in bdpar. Concretely,
"numCores" entry allows defining the number of CPU cores to be used
when a pipeline is executed. By default, bdpar is configured following
a sequential paradigm (numCores = 1
). Additionally, to ensure proper
use of CPU resources bdpar provides a mechanism to verify whether the
number of assigned CPU cores is compatible with the hardware
specifications or not. If the assigned CPU cores are not valid, bdpar
will be executed using the most optimal configuration according to the
hardware specifications.
The last category comprises different dictionaries needed to perform multiple language-dependent operations (such as contraction detection or stopwords removal). Dictionaries provided by default ensure the compatibility of bdpar from 8 to 50 different languages (depending on the text-mining operation selected).
In order to illustrate the functionality of the bdpar package from a
more realistic perspective, we developed a case study to show the most
frequent words from a heterogeneous dataset collection (containing SMS
and emails). The dataset comprises 20 emails (eml format) and 20 SMS in
plain text from the nutritional and health domain. Moreover, to ensure
straightforward reproducibility, (i) all the resources used are included
in the package, and the pipeline provided by the package
(DefaultPipeline
) was selected to perform the content preprocessing
flow, and (ii) resulting dataset was stored into a structured CSV file.
Once the pipeline was executed, 18 new columns were generated from the
outputs acquired after executing some pipeline tasks (labeled according
to the property names described in Table 1). For instance,
the execution of FindEmojiPipe
forces the creation (if not exists) of
a new column (named "emojis") containing the emojis found for each
instance (or blank if not found).
To carry out the case of study, word frequencies were computed over the text content generated after executing the whole pipeline tasks (stored in the data column). Additionally, some previous text-cleaning operations were performed over the preprocessed text prior to executing the computation of the word frequencies (using word-cloud plots). Concretely, words were reduced to their stem form (stemDocument), punctuation marks were deleted, and numbers were removed (using the tm package (Feinerer et al. 2008)). Finally, for comparison purposes, frequencies were calculated both individually and jointly (see Figures 2 and 3)
#Execute bdpar framework
::runPipeline(path= system.file(“example”, package= “bdpar”),
bdparcache= FALSE)
#Load CSV generated after executing bdpar
<- read.csv(file= bdpar.Options$get("teeCSVPipe.output.path"),
dataset sep= ";", stringsAsFactors= FALSE )
#Separate instances by type
<- dataset[dataset$extension == "tsms", ]
sms <- dataset[dataset$extension == "eml", ]
eml # Function to clean text and compute frequencies
<- function(data) {
word.frec <- tm::VCorpus(VectorSource(data))
corpus <- tm::tm_map(corpus, removePunctuation)
corpus <- tm::tm_map(corpus, removeNumbers)
corpus <- tm::tm_map(corpus, stemDocument)
corpus <- sort(rowSums(as.matrix(tm::TermDocumentMatrix(corpus))),
sorted decreasing = TRUE)
return(data.frame(word = names(sorted), freq = sorted))
}<- word.frec(sms$data)
sms.words <- word.frec(eml$data)
eml.words <- word.frec(dataset$data)
all.words # Wordcloud for sms and emails
par(mfrow=c(1,2))
::wordcloud(words= sms.words$word, freq= sms.words$freq,
wordcloudmin.freq= 1, max.words= 100, random.order= FALSE,
rot.per= .5, colors= RColorBrewer::brewer.pal(8, "Dark2"))
::wordcloud(words= eml.words$word, freq= eml.words$freq,
wordcloudmin.freq= 1, max.words= 100, random.order= FALSE,
rot.per= .5, colors= RColorBrewer::brewer.pal(8, "Dark2"))
par(mfrow=c(1,1))
#Wordcloud for all instances (sms and email)
::wordcloud(words= dataset.words$word, freq= all.words$freq,
wordcloudmin.freq= 1, max.words= 100, random.order= FALSE,
rot.per= .5, colors= RColorBrewer::brewer.pal(8, "Dark2"))
Figure 2 graphically represents the results achieved after performing an individualized analysis of each dataset (SMS and emails). Conversely, Figure 3 represents the frequencies achieved when analyzing both datasets together. Observing Figure 2, we see that frequencies of words included in SMS messages are lower than those included in emails. This is mainly due to the limited length of SMS messages (up to 160 characters). Therefore, the word frequency in Figure 3 has increased considerably, mainly owing to the joint evaluation of both datasets.
Keeping in mind the functionality provided by bdpar (demonstrated through the current case study), it is easy to deduce that the application of data mining techniques over unstructured data could be easily addressed by taking advantage of the functionality of our framework. Some big data operations that could be addressed by taking advantage of bdpar are the clustering of documents using token features, the classification of documents, or the retrieval of documents for specific queries.
In this work, we introduced bdpar, a pipe-based R framework to
facilitate the creation of unified datasets from heterogeneous sources.
Our framework allows users to (i) define new content extractors (data
parsers), (ii) develop and deploy new preprocessing tasks (pipes), and
(iii) define and build customized interconnected task flows (pipelines).
Additionally, to save computational resources and increase execution
speed, bdpar provides an optimized pipe operator (noted as %>I%
)
capable of aborting the processing of an instance if an error was
detected. Finally, a case study was developed to demonstrate the
capability of the framework to preprocess and unify heterogeneous data
into a single CSV file.
Future work is focused on two main aspects: (i) the development of semantic-based tasks able to explode the semantic relationships between synsets and; (ii) the capability to represent using a graph-based visualization the pipes comprising each pipeline, and (iii) the analysis of textual polarity and sentiment analysis.
The work of Tomás R. Cotos-Yáñez has been partially supported by the projects IN2017-84658-C2-1-R and PID2020-118101GB-I00 of the Spanish Ministry of Industry. David Ruano-Ordás has been supported by a post-doctoral fellowship from Xunta de Galicia (POSB-2021/024). Additionally, the work of José R. Méndez was partially funded by the project Semantic Knowledge Integration for Content-Based Spam Filtering (TIN2017-84658-C2-1-R) from the Spanish Ministry of Economy, Industry, and Competitiveness (SMEIC), State Research Agency (SRA) and the European Regional Development Fund (ERDF). SING group thanks CITI (Centro de Investigación, Transferencia e Innovación) from the University of Vigo for hosting its IT infrastructure. Finally, we thank the reviewers for their deep appropriate suggestions to improve the quality of the manuscript.
R6, repo, drake, magrittr, rtweet, tuber, tm
Databases, HighPerformanceComputing, NaturalLanguageProcessing, ReproducibleResearch, WebTechnologies
This article is converted from a Legacy LaTeX article using the texor package. The pdf version is the official version. To report a problem with the html, refer to CONTRIBUTE on the R Journal homepage.
Text and figures are licensed under Creative Commons Attribution CC BY 4.0. The figures that have been reused from other sources don't fall under this license and can be recognized by a note in their caption: "Figure from ...".
For attribution, please cite this work as
Ferreiro-Díaz, et al., "The bdpar Package: Big Data Pipelining Architecture for R", The R Journal, 2021
BibTeX citation
@article{RJ-2021-065, author = {Ferreiro-Díaz, Miguel and Cotos-Yáñez, Tomás R. and Méndez, José R. and Ruano-Ordás, David}, title = {The bdpar Package: Big Data Pipelining Architecture for R}, journal = {The R Journal}, year = {2021}, note = {https://rjournal.github.io/}, volume = {13}, issue = {1}, issn = {2073-4859}, pages = {131-145} }