At Proton, data are our bread and butter. The core of using Proton for our distribution clients is to allow us to make sense of their data. As we grow with both new clients and exciting new features for current clients, we gain access to more data from various sources. With this, it is imperative that we have an established process of receiving, processing and storing the data so that it can power our systems and AI models.

Our clients have very complicated data flows. Usually their data come from different sources in many files and formats. For some clients we have up to 20 data sources. The ingest of each source may contain multiple steps, from filtering invalid orders to storing them in multiple data layers. We are making this process reliable, efficient and at scale for our clients. This way, they can feel comfortable and open to use more of our features and improve their overall experience of integrating with and using Proton. Let’s take a look at some of them.

Getting Started

After we’ve dotted the Is and crossed the Ts of a client contract, we start by establishing reliable data transfer. We make this process as easy as possible for our clients.

We have two ways of doing the exchange. Often, we use our IPAAS partner Workato to establish a direct connection to our clients’ internal systems. This makes the process easier for them: it requires less work from their side. It’s also better for us since we can poll data on more frequent intervals. Our clients may also provide regular data file exports from their systems. The amount of data and number of files we get largely depends on the number of Proton features we are implementing for the client.

We’ll save the Workato system for another time. Today, we’re going to talk more about the second way: integrating with exported data files.

Getting the Data

First, we need access to the files every day. The transfer of files happens by using our FTP server in Google Cloud Storage, usually once per day.

Next, we need to ingest the data in those files. And for that, we need a scheduling system that could organize all the different tasks per client. Each ingest has many steps, such as weeding out invalid orders or removing duplicates, that need to be coordinated. We use Apache Airflow to keep everything in harmony.

For those of you unfamiliar with it, Airflow is a popular workflow orchestration framework. You set up workflows (called DAGs, short for Directed Acyclic Graph) where each node of the graph represents one task or job that is executed. For our usage, we have a DAG for each client. Each data source that needs to be ingested is represented as a separate task. In Airflow language, Operators determine what gets done in a task. You could call a Python function, execute a SQL command or even run a Kubernetes Job. In fact, all the jobs that are triggered from our Airflow instances use the Kubernetes Operator. Put simply, this means that everything is running on our Kubernetes cluster. The jobs trigger the creation of a Dockerized Spark container that executes the code. Running jobs in flexible containers means we are not wasting money: we can adjust our resources to the specific client needs. You can read more about our usage of Kubernetes in this wonderful post from my colleague Olivia.

Processing the Data

Now that we’ve covered the process, let’s talk about the processing. To ingest data we need to:

  1. Read
  2. Map
  3. Transform
  4. Validate
  5. Save

For the read, we often need to load enormous files of data, tens of gigabytes. We can not read and process row by row since some of our transformations actually require comparing records. To get around this, we use Apache Spark. Spark is a large-scale data processing platform that can process enormous amounts of data in parallel. It includes parallelized readers that read all the data simultaneously.

The second step is mapping the data, and this is the simplest step. At this point, we are usually only extracting the data we need from the source and map it to our internal fields. For example, we take a customer’s “product_name” field and map it to our “name” label.

Once we have mapped the necessary fields, we move to the third step, data transformation. We can think of transformations as simple or complex. A simple transformation might be to append something to a string, split a list, or strip characters or empty spaces. The more complex transformations combine multiple fields or require cross referencing another data source. Multi-field transformations take several mapped fields, combine them, and create a new field.

Cross referencing an additional data source is actually the most complex part of an ingest. For example, besides the base product data file, other product data can be shared with us in separate files for categories mapping, inventory count, manufacturer data. When processing the base product file, we also need to reference the other files to get complete product data. In order to do this efficiently, we need to join all the data together. So Spark reads a separate data source, maps it to internal fields, and performs the joins against shared fields. A full ingest usually requires many of these complex merges.

The fourth step is validation. Here we make sure that the mandatory data fields are present, as well as filter out duplicates. We also check not to overwrite some already ingested data.

The last step is saving the data. Often, this is something simple like saving the data in a single database. But that’s not always the case. The main users of our system are sales reps who work at the speed of light. For Proton to be an effective tool, it needs to be really fast. To do that, especially because we ingest so many data, we must cache a lot of the data that is most frequently used. If we didn’t do that, our system would be too slow. For example, we store specific transformations of our data in Redis in addition to writing to our main databases.

Conclusion

At Proton, we have a lot of data to ingest, and it needs to work every time. We use Airflow DAGs to orchestrate a huge number of steps from ingesting giant volumes of data to managing multiple writes to different datastores. Looking forward, we are always trying to upgrade and improve it. We’re upgrading to Airflow 2.0 and hope to explore the new opportunities it offers. We already have a Kafka cluster that we are actively looking to include in our process for reliability. Kafka in particular represents a key part of our efforts to switch to a more direct data transfer system.