Hello team. I have built my server using ktor and ...
# kmongo
i
Hello team. I have built my server using ktor and mongo db. Being a novice at mongoDb i haven't quite leveraged on mongo db aggregation and lookups for optimization. I need assistance in changing my function to using aggregation but i a experiencing an error. This is what i currently work with
Copy code
Then current function
override suspend fun getRequest(requestId: String): Request? {
    val requestDto = requestsDto.findOneById(id = requestId) ?: return null

    // Here we get the client details as AuthRequest
    val client = users.findOneById(requestDto.user_id) ?: return null

    val clientDetails = AuthRequest(
        user_id = requestDto.user_id,
        first_name = client.first_name ?: "",
        last_name = client.last_name ?: "",
        phone_number = client.phone_number ?: "",
        isPhone_number_verified = client.isPhone_number_verified,
        address = client.address ?: "",
        gender = client.gender ?: "",
        email = client.email ?: "",
        role = client.role ?: "",
        withdrawStatus = client.withdrawStatus ?: "",
        walletId = client.walletId ?: "",
        walletBalance = client.walletBalance ?: 0.0,
        mySecuredAmount = client.mySecuredAmount ?: 0.0,
        referralId = client.referralId ?: "",
        isFirstTimeRequesting = client.isFirstTimeRequesting,
        verified_identity_url = client.verified_identity_url ?: "",
        verification_level1_status = client.verification_level1_status,
        verification_level2_status = client.verification_level2_status,
        verification_level3_status = client.verification_level3_status,
        verification_level4_status = client.verification_level4_status,
        verification_level5_status = client.verification_level5_status,
        profile_url = client.profile_url ?: "",
        client_average_rating = client.client_average_rating ?: 0.0,
        client_complete_tasks = client.client_complete_tasks ?: 0,
        isTasker = client.isTasker,
        tasker_application_status = client.tasker_application_status,
        latitude = client.latitude ?: 0.0,
        longitude = client.longitude ?: 0.0,
        password = "",
    )

    // Here we fetch the details of the tasker from our user documents
    val taskerClient = users.findOneById(requestDto.tasker_id) ?: return null

    val taskerClientDetails = AuthRequest(
        user_id = requestDto.tasker_id,
        first_name = taskerClient.first_name ?: "",
        last_name = taskerClient.last_name ?: "",
        phone_number = taskerClient.phone_number ?: "",
        isPhone_number_verified = taskerClient.isPhone_number_verified,
        address = taskerClient.address ?: "",
        gender = taskerClient.gender ?: "",
        email = taskerClient.email ?: "",
        role = taskerClient.role ?: "",
        withdrawStatus = taskerClient.withdrawStatus ?: "",
        walletId = taskerClient.walletId ?: "",
        walletBalance = taskerClient.walletBalance ?: 0.0,
        mySecuredAmount = taskerClient.mySecuredAmount ?: 0.0,
        referralId = taskerClient.referralId ?: "",
        isFirstTimeRequesting = taskerClient.isFirstTimeRequesting,
        verified_identity_url = taskerClient.verified_identity_url ?: "",
        verification_level1_status = taskerClient.verification_level1_status,
        verification_level2_status = taskerClient.verification_level2_status,
        verification_level3_status = taskerClient.verification_level3_status,
        verification_level4_status = taskerClient.verification_level4_status,
        verification_level5_status = taskerClient.verification_level5_status,
        profile_url = taskerClient.profile_url ?: "",
        client_average_rating = taskerClient.client_average_rating ?: 0.0,
        client_complete_tasks = taskerClient.client_complete_tasks ?: 0,
        isTasker = taskerClient.isTasker,
        tasker_application_status = taskerClient.tasker_application_status,
        latitude = taskerClient.latitude ?: 0.0,
        longitude = taskerClient.longitude ?: 0.0,
        password = "",
    )

    // Here we fetch the tasker details from the Mongo collection
    val taskerDto = taskersDto.findOneById(requestDto.tasker_id) ?: return null
    val tasker = Tasker(
        tasker_id = requestDto.tasker_id,
        user = taskerClientDetails,
        job_images = taskerDto.job_images ?: emptyList(),
        is_approved = taskerDto.is_approved,
        is_accepting_requests = taskerDto.is_accepting_requests,
        tasker_reg_date = taskerDto.tasker_reg_date,
        is_favorite = taskerDto.is_favorite,
        taskerCancelledTasks = taskerDto.taskerCancelledTasks,
        tasker_average_rating = taskerDto.tasker_average_rating ?: 0.0,
        respond_time = taskerDto.respond_time ?: "",
        tasker_complete_tasks = taskerDto.tasker_complete_tasks ?: 0,
        tasker_about = taskerDto.tasker_about ?: "",
        passport_photo = taskerDto.passport_photo ?: "",
        id_card_front = taskerDto.id_card_front ?: "",
        id_card_back = taskerDto.id_card_back ?: "",
        distance = taskerDto.distance ?: 0.0
    )

    return Request(
        request_id = requestDto.request_id,
        user = clientDetails,
        tasker = tasker,
        requested_skill = requestDto.requested_skill ?: "",
        scheduled_date = requestDto.scheduled_date ?: "",
        scheduled_time = requestDto.scheduled_time ?: "",
        task_location = requestDto.task_location ?: "",
        task_description = requestDto.task_description ?: "",
        task_distance = requestDto.task_distance ?: 0.0,
        date_of_request = requestDto.date_of_request ?: 0,
        task_images = requestDto.task_images ?: emptyList(),
        notification_status = requestDto.notification_status ?: "",
        date_of_task_notification = requestDto.date_of_task_notification ?: 0,
        receipt_no = requestDto.receipt_no ?: "",
        isNotificationSeenByClient = requestDto.isNotificationSeenByClient,
        isNotificationSeenByTasker = requestDto.isNotificationSeenByTasker,
        task_latitude = requestDto.task_latitude ?: 0.0,
        task_longitude = requestDto.task_longitude ?: 0.0,
    )
}
Copy code
// This is my aggregated function that I need assistance with. It currently has an error and i can't figure it out.

override suspend fun getRequest(requestId: String): Request? {

    // Define the aggregation pipeline
          val pipeline = listOf(
            // Match the request by requestId
            match(eq("_id", requestId)),

        // Lookup client details from the users collection
        lookup(
            from = "users",
            localField = "user_id",
            foreignField = "_id",
            newAs = "client"
        ),
        // Unwind the client array
        unwind("\$client"),

        // Lookup tasker details from the users collection
        lookup(
            from = "users",
            localField = "tasker_id",
            foreignField = "_id",
            newAs = "tasker_user"
        ),
        // Unwind the tasker_user array
        unwind("\$tasker_user"),

        // Lookup tasker metadata from the taskers collection
        lookup(
            from = "taskers",
            localField = "tasker_id",
            foreignField = "_id",
            newAs = "tasker_metadata"
        ),
        // Unwind the tasker_metadata array
        unwind("\$tasker_metadata"),

        // Project the required fields
        project(
            fields(
                Request::request_id from RequestDTO::request_id,
                Request::user from "\$client",
                Request::tasker from mapOf(
                    Tasker::tasker_id to "\$tasker_metadata._id",

                    Tasker::user to "\$tasker_user",
                    Tasker::job_images to "\$tasker_metadata.job_images",
                    Tasker::is_approved to "\$tasker_metadata.is_approved",
                    Tasker::is_accepting_requests to "\$tasker_metadata.is_accepting_requests",
                    Tasker::tasker_reg_date to "\$tasker_metadata.tasker_reg_date",
                    Tasker::is_favorite to "\$tasker_metadata.is_favorite",
                    Tasker::taskerCancelledTasks to "\$tasker_metadata.taskerCancelledTasks",
                    Tasker::tasker_average_rating to "\$tasker_metadata.tasker_average_rating",
                    Tasker::respond_time to "\$tasker_metadata.respond_time",
                    Tasker::tasker_complete_tasks to "\$tasker_metadata.tasker_complete_tasks",
                    Tasker::tasker_about to "\$tasker_metadata.tasker_about",
                    Tasker::passport_photo to "\$tasker_metadata.passport_photo",
                    Tasker::id_card_front to "\$tasker_metadata.id_card_front",
                    Tasker::id_card_back to "\$tasker_metadata.id_card_back",
                    Tasker::distance to "\$tasker_metadata.distance",
                ),
                Request::requested_skill from RequestDTO::requested_skill,
                Request::scheduled_date from RequestDTO::scheduled_date,
                Request::scheduled_time from RequestDTO::scheduled_time,
                Request::task_location from RequestDTO::task_location,
                Request::task_description from RequestDTO::task_description,
                Request::task_distance from RequestDTO::task_distance,
                Request::date_of_request from RequestDTO::date_of_request,
                Request::task_images from RequestDTO::task_images,
                Request::notification_status from RequestDTO::notification_status,
                Request::date_of_task_notification from RequestDTO::date_of_task_notification,
                Request::receipt_no from RequestDTO::receipt_no,
                Request::isNotificationSeenByClient from RequestDTO::isNotificationSeenByClient,
                Request::isNotificationSeenByTasker from RequestDTO::isNotificationSeenByTasker,
                Request::task_latitude from RequestDTO::task_latitude,
                Request::task_longitude from RequestDTO::task_longitude
            )
        )
    )

    // Execute the aggregation pipeline
    return requestsDto.aggregate<Request>(pipeline).first()
}

The request module

@Serializable
data class Request(
    @BsonId
    val request_id: String = ObjectId().toString(), // Document_id

    val user: AuthRequest,
    val tasker: Tasker,
    val requested_skill: String,
    val scheduled_date: String,
    val scheduled_time: String,
    val task_location: String,
    val task_description: String,
    val task_distance: Double,
    val date_of_request: Long,
    val task_images : List<String>,
    val notification_status: String,  // Pending, Declined, Accepted, Expired, Completed $ Canceled
    val date_of_task_notification: Long, // Declined, Accepted, Expired Date, Completed $ Canceled
    val receipt_no: String,
    val isNotificationSeenByClient: Boolean,
    val isNotificationSeenByTasker: Boolean,
    val task_latitude: Double,
    val task_longitude: Double

)