Integrating SKIL into Salesforce to Predict Deal Success [2/2]

In the previous post, we configured Salesforce and wrote a custom Salesforce Lightning component to connect to our SKIL instance.

Now we'll develop our data transform process and the model that'll predict the probability of closing the deal for a given opportunity.

Writing ETL and Model Configurations

First, we'll write the transform and model code for estimating the probability for closing the deal for the opportunity.

Log in to your SKIL instance and create a workspace. Name it SmartProb. Download the JSON file for the notebook available here and create an experiment from the downloaded JSON.

In the second paragraph of the notebook, we've created a SKILContext class that we'll use to save our model details.

val skilContext = new SkilContext()
val client = skilContext.client

In the next paragraph, we're going to download the CSV data that we'll train on. Zeppelin allows us to write shell scripts in the notebooks, like this:

%sh curl https://skilresources.blob.core.windows.net/demo/Opportunity.csv > /tmp/Opportunity.csv

After the data is downloaded, we'll read it from where it's downloaded, and then view it:

val text = sc.textFile("/tmp/Opportunity.csv")
println("%table " + text.collect().mkString("\n").replace(",", "\t"))


Now, we'll write a transform process schema to define and read the format of the CSV file.

val schema = new Schema.Builder()
    .addColumnsString("Id", "IsDeleted", "AccountId", "IsPrivate", "Name", "Description", "StageName", "StageSortOrder")
    .addColumnCategorical("Probability", Arrays.asList("10", "15", "20", "25", "30", "35", "40", "45", "50", "55", "60", "65", "70", "75", "80", "85", "90", "95", "100"))
    .addColumnsString("TotalOpportunityQuantity", "CloseDate")
    .addColumnCategorical("Type", Arrays.asList("New Customer","Existing Customer - Upgrade","Existing Customer - Replacement", ""))
    .addColumnCategorical("LeadSource", Arrays.asList("Web", "External Referral", "", "Employee Referral", "Trade Show", "Partner", "Purchased List", "Word of mouth", "Public Relations", "Phone Inquiry"))
    .addColumnCategorical("IsClosed", Arrays.asList("0", "1"))
    .addColumnCategorical("IsWon", Arrays.asList("0", "1"))
    .addColumnCategorical("ForecastCategory", Arrays.asList("Pipeline", "Closed"))
    .addColumnCategorical("ForecastCategoryName", Arrays.asList("Pipeline", "Closed"))
    .addColumnsString("CampaignId", "HasOpportunityLineItem", "Pricebook2Id", "OwnerId", "CreatedDate", "CreatedById", "LastModifiedDate", "LastModifiedById", "SystemModstamp", "LastActivityDate", "LastStageChangeDate", "FiscalYear")
    .addColumnCategorical("FiscalQuarter", Arrays.asList("1", "2"))
    .addColumnsString("PrimaryPartnerAccountId", "ContractId", "DeliveryInstallationStatus__c", "TrackingNumber__c", "OrderNumber__c")

After that, we're going to write the transform process to vectorize the data to a format that a neural network can read.

val tp = new TransformProcess.Builder(schema)
            .removeColumns("Id", "IsDeleted", "AccountId", "IsPrivate", "Name", "Description", "StageName", "StageSortOrder")
            .removeColumns("TotalOpportunityQuantity", "CloseDate")
            .removeColumns("CampaignId", "HasOpportunityLineItem", "Pricebook2Id", "OwnerId", "CreatedDate", "CreatedById", "LastModifiedDate", "LastModifiedById", "SystemModstamp", "LastActivityDate", "LastStageChangeDate", "FiscalYear")
            .removeColumns("PrimaryPartnerAccountId", "ContractId", "DeliveryInstallationStatus__c", "TrackingNumber__c", "OrderNumber__c")
            .filter(new ConditionFilter(new CategoricalColumnCondition("IsClosed", ConditionOp.Equal, "1")))
            .removeColumns("IsClosed", "IsWon", "ForecastCategory", "ForecastCategoryName", "CurrentGenerators__c", "MainCompetitors__c")

Finally, we'll run this transform process to parse our input data:

val jsc = JavaSparkContext.fromSparkContext(sc)
val raw = jsc.textFile("/tmp/Opportunity.csv")
val text = raw.filter((line: String) => line != """"Id","IsDeleted","AccountId","IsPrivate","Name","Description","StageName","StageSortOrder","Amount","Probability","ExpectedRevenue","TotalOpportunityQuantity","CloseDate","Type","NextStep","LeadSource","IsClosed","IsWon","ForecastCategory","ForecastCategoryName","CampaignId","HasOpportunityLineItem","Pricebook2Id","OwnerId","CreatedDate","CreatedById","LastModifiedDate","LastModifiedById","SystemModstamp","LastActivityDate","LastStageChangeDate","FiscalYear","FiscalQuarter","PrimaryPartnerAccountId","ContractId","DeliveryInstallationStatus__c","TrackingNumber__c","OrderNumber__c","CurrentGenerators__c","MainCompetitors__c"""")
val rr = new CSVRecordReader()
val parsedInputData = text.toJavaRDD().map(new StringToWritablesFunction(rr))
val processedData = SparkTransformExecutor.execute(parsedInputData, tp)

Now, we'll write another transform process to normalize our training data set.

import org.datavec.spark.transform.AnalyzeSpark

val dataAnalysis = AnalyzeSpark.analyze(tp.getFinalSchema(), processedData, 10)

val tp2 = new TransformProcess.Builder(tp.getFinalSchema())
    .normalize("Amount", Normalize.Standardize, dataAnalysis)
    .normalize("ExpectedRevenue", Normalize.Standardize, dataAnalysis)
val normalizedTrainData = SparkTransformExecutor.execute(processedData, tp2)
val trainSet =  normalizedTrainData.map(new DataVecDataSetFunction(1, 19, false)).collect();

With that, we have the data that our network can train on. We'll now create the train dataset iterator and configure our model, then train on the data and get the evaluation results.

val inputNum = 18
val outputNum = 19 // number of output classes
val batchSize = 4 // batch size for each epoch
val rngSeed = 123 // random number seed for reproducibility
val numEpochs = 15 // number of epochs to perform

import org.deeplearning4j.datasets.iterator.ExistingDataSetIterator

val trainData = new MultipleEpochsIterator(numEpochs, new ExistingDataSetIterator(trainSet))

val conf: MultiLayerConfiguration = new NeuralNetConfiguration.Builder()
    .updater(new Nesterovs(0.006, 0.9))
    .layer(0, new OutputLayer.Builder(LossFunction.NEGATIVELOGLIKELIHOOD)

val model = new MultiLayerNetwork(conf)


val evaluation = model.evaluate(trainData)

Now, we'll save this evaluation result with our model to create a model performance history.

val modelId = skilContext.addModelToExperiment(z, model, tp2, "SmartProb LR new")
val evalId = skilContext.addEvaluationToModel(z, modelId, evaluation, "LR " + numEpochs + " epochs")

We'll also save our transform process so that we can deploy it later.

val tp3 = new TransformProcess.Builder(tp.getFinalSchema())
    .normalize("Amount", Normalize.Standardize, dataAnalysis)
    .normalize("ExpectedRevenue", Normalize.Standardize, dataAnalysis)
val inferenceJson = tp3.toJson()

import java.nio.file.{Paths, Files}
import java.nio.charset.StandardCharsets

Files.write(Paths.get("/tmp/inferenceTP.json"), inferenceJson.getBytes(StandardCharsets.UTF_8))

Run all the paragraphs to execute the transform and model code.

Deploying Data Pipelines and Models

After executing all the notebook code, we're going to deploy the model and the data transform.

Deploying the Model

In the model tab, select the model you just saved.


Click the deploy button, and go through the deployment process.


Follow the images below to deploy the model:


Deploying the Data Transform

Navigate to the deployments page and select the available deployment. In the ETL portion, click "import" and specify the file name of the transform process you saved (file:///tmp/inferenceTP.json here).

Follow the image below for the details

Starting the Model and Transform Server

Click "start" on both the model server and transform server to begin serving through the endpoint's REST API.


Viewing the Results

Now refresh the page containing your opportunity details. You'll see the predicted probability of a close appearing in the SmartProb component you just added! Happy hunting.