admin管理员组

文章数量:1290896

I'm trying to obtain the Microsoft Secure Score data using the Microsoft Graph API:

and I'm currently facing some discrepancies between the scores I get and the scores displayed in Microsoft Defender.

Here is my current code using PySpark:

import requests
import json

tenant_id = ""
client_id = ""
client_secret = ""

token_url = f"/{tenant_id}/oauth2/v2.0/token"

token_data = {
    "grant_type": "client_credentials",
    "client_id": client_id,
    "client_secret": client_secret,
    "scope": "/.default"
}

response = requests.post(token_url, data=token_data)
if response.status_code == 200:
    access_token = response.json().get("access_token")
else:
    raise Exception(f"Error to obtain the token: {response.text}")

api_url = ".0/security/secureScores"

headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

api_response = requests.get(api_url, headers=headers)
if api_response.status_code == 200:
    data = api_response.json().get("value", [])
else:
    raise Exception(f"Error to call API: {api_response.text}")

if data:
    df = spark.createDataFrame(data)
    # df.show()
else:
    print("no data from API.")

for field in df.schema.fields:
    if isinstance(field.dataType, (MapType, ArrayType)):
        df = df.withColumn(field.name, to_json(col(field.name)))


df_1 = df.withColumn(
    "controlScores",
    from_json(
        col("controlScores"),
        ArrayType(
            StructType([ 
                StructField("controlCategory", StringType(), True),
                StructField("lastSynced", StringType(), True),
                StructField("score", StringType(), True),
                StructField("implementationStatus", StringType(), True),
                StructField("controlName", StringType(), True),
                StructField("description", StringType(), True),
                StructField("scoreInPercentage", StringType(), True)
            ])
        )
    )
)

df_2 = df_1.withColumn("controlScore", explode("controlScores"))

df_3 = df_2.select(
    'createdDateTime',
    'currentScore',
    'maxScore',
    col("controlScore.lastSynced").alias("lastSynced"),
    col("controlScore.controlCategory").alias("controlCategory"),
    col("controlScore.controlName").alias("controlName"),
    col("controlScore.description").alias("description"),
    col("controlScore.scoreInPercentage").alias("scoreInPercentage")
)

display(
    df_3
    .agg(avg(col('currentScore') / col('maxScore')) * 100)
)

display(df_3.groupBy('controlCategory').agg(avg('scoreInPercentage')))

However, I'm not getting the same scores as shown in Microsoft Defender.

I researched and found this article on Microsoft Docs: How to calculate Identity SecureScore via Graph API.

I tried implementing it, but I'm still unable to obtain the same scores (both Overall and per Category).

Additionally, I can access the averageComparativeScores to see the comparison between my organization and similar organizations, but I also cannot get the same comparison values.

Has anyone faced a similar issue? Could you help me figure out where I'm going wrong?

Desired output:

Any suggestions or guidance would be greatly appreciated!

Thank you in advance!

UPDATE:

I revised the code and, for example, I was able to obtain the same value for the Data category, but for the other categories, I was not able to. The difference is still significant:

code:

    import requests
    import json
    
    tenant_id = ""
    client_id = ""
    client_secret = ""
    
   token_url = f"/{tenant_id}/oauth2/v2.0/token"

token_data = {
    "grant_type": "client_credentials",
    "client_id": client_id,
    "client_secret": client_secret,
    "scope": "/.default"
}

response = requests.post(token_url, data=token_data)
if response.status_code == 200:
    access_token = response.json().get("access_token")
else:
    raise Exception(f"Erro ao obter token: {response.text}")

api_url = ".0/security/secureScores"

headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

api_response = requests.get(api_url, headers=headers)
if api_response.status_code == 200:
    data = api_response.json().get("value", [])
else:
    raise Exception(f"Erro ao chamar API: {api_response.text}")

if data:
    df = spark.createDataFrame(data)
    # df.show()
else:
    print("Nenhum dado retornado pela API.")

for field in df.schema.fields:
    if isinstance(field.dataType, (MapType, ArrayType)):
        df = df.withColumn(field.name, to_json(col(field.name)))


df_1 = df.withColumn(
    "controlScores",
    from_json(
        col("controlScores"),
        ArrayType(
            StructType([
                StructField("controlCategory", StringType(), True),
                StructField("id", StringType(), True),
                StructField("lastSynced", StringType(), True),
                StructField("score", StringType(), True),
                StructField("implementationStatus", StringType(), True),
                StructField("controlName", StringType(), True),
                StructField("description", StringType(), True),
                StructField("scoreInPercentage", StringType(), True)
            ])
        )
    )
)

df_2 = df_1.withColumn("controlScore", explode("controlScores"))

df_3 = (df_2.select(
    col('createdDateTime'),
    col('currentScore'),
    col('maxScore'),
    col("controlScore.lastSynced").alias("lastSynced"),
    col("controlScore.controlCategory").alias("controlCategory"),
    col("controlScore.controlName").alias("controlName"),
    col("controlScore.description").alias("description"),
    col("controlScore.scoreInPercentage").alias("scoreInPercentage"),
    col("controlScore.id").alias("id"),
    col("controlScore.score").alias("score")
        ))

results = {}

for category in categories:
    max_score_total = 0
    score_total = 0

    control_category = df_3.filter(df_3["controlCategory"] == category).collect()

    for control in control_category:
        score = float(control["score"]) if control["score"] is not None else 0.0
        score_in_percentage = float(control["scoreInPercentage"]) if control["scoreInPercentage"] is not None else 0.0

        if score_in_percentage == 0:

            control_id = control["id"]
            
            control_profile_url = f".0/security/secureScoreControlProfiles/{control_id}"
            control_profile_response = requests.get(control_profile_url, headers=headers)

            if control_profile_response.status_code == 200:
                control_profile_data = control_profile_response.json()
                max_score = control_profile_data.get("maxScore", 1) 
            else:
                max_score = 1
        else:
            max_score = score / (score_in_percentage * 0.01)

        max_score_total += max_score
        score_total += score

    per_category = (score_total / max_score_total) * 100 if max_score_total > 0 else 0

    results[category] = {
        "Score Total": score_total,
        "Max Score Total": max_score_total,
        "Percentual": per_category,
    }

print(json.dumps(results, indent=4))

本文标签: azureHow to correctly calculate Microsoft Secure Score with PySpark and Graph APIStack Overflow