কোটলিন অ্যান্ড্রয়েডে প্রবাহিত হয়

coroutines-এ, একটি ফ্লো হল এমন একটি ধরন যা ক্রমানুসারে একাধিক মান নির্গত করতে পারে, শুধুমাত্র একটি একক মান ফিরিয়ে দেয় এমন ফাংশনগুলিকে স্থগিত করার বিপরীতে। উদাহরণস্বরূপ, আপনি একটি ডাটাবেস থেকে লাইভ আপডেট পেতে একটি প্রবাহ ব্যবহার করতে পারেন।

ফ্লোগুলি কোরোটিনের উপরে তৈরি করা হয় এবং একাধিক মান প্রদান করতে পারে। একটি প্রবাহ ধারণাগতভাবে ডেটার একটি প্রবাহ যা অ্যাসিঙ্ক্রোনাসভাবে গণনা করা যেতে পারে। নির্গত মান একই ধরনের হতে হবে। উদাহরণস্বরূপ, একটি Flow<Int> হল একটি প্রবাহ যা পূর্ণসংখ্যার মান নির্গত করে।

একটি প্রবাহ একটি Iterator সাথে খুব সাদৃশ্যপূর্ণ যেটি মানগুলির একটি ক্রম তৈরি করে, তবে এটি অসিঙ্ক্রোনাসভাবে মানগুলি তৈরি এবং ব্যবহার করতে সাসপেন্ড ফাংশন ব্যবহার করে। এর মানে হল, উদাহরণস্বরূপ, প্রধান থ্রেড ব্লক না করেই প্রবাহ নিরাপদে পরবর্তী মান তৈরি করার জন্য একটি নেটওয়ার্ক অনুরোধ করতে পারে।

ডেটার প্রবাহের সাথে জড়িত তিনটি সত্তা রয়েছে:

  • একজন প্রযোজক ডেটা তৈরি করে যা স্ট্রীমে যোগ করা হয়। কোরোটিনের জন্য ধন্যবাদ, ফ্লোও অ্যাসিঙ্ক্রোনাসভাবে ডেটা তৈরি করতে পারে।
  • (ঐচ্ছিক) মধ্যস্থতাকারীরা স্ট্রীম বা প্রবাহে নির্গত প্রতিটি মান পরিবর্তন করতে পারে।
  • একজন ভোক্তা স্রোত থেকে মান গ্রহণ করে।

তথ্য প্রবাহে জড়িত সত্তা; ভোক্তা, ঐচ্ছিক মধ্যস্থতাকারী এবং প্রযোজক
চিত্র 1. ডেটা প্রবাহের সাথে জড়িত সত্তা: ভোক্তা, ঐচ্ছিক মধ্যস্থতাকারী এবং প্রযোজক।

অ্যান্ড্রয়েডে, একটি সংগ্রহস্থল সাধারণত UI ডেটার একটি প্রযোজক যার ব্যবহারকারী হিসাবে ব্যবহারকারী ইন্টারফেস (UI) থাকে যা শেষ পর্যন্ত ডেটা প্রদর্শন করে। অন্য সময়ে, UI স্তরটি ব্যবহারকারীর ইনপুট ইভেন্টগুলির একটি প্রযোজক এবং অনুক্রমের অন্যান্য স্তরগুলি সেগুলি গ্রহণ করে৷ প্রযোজক এবং ভোক্তার মধ্যে স্তরগুলি সাধারণত মধ্যস্থতাকারী হিসাবে কাজ করে যা নিম্নলিখিত স্তরের প্রয়োজনীয়তার সাথে সামঞ্জস্য করার জন্য ডেটার প্রবাহকে সংশোধন করে।

একটি প্রবাহ সৃষ্টি

ফ্লো তৈরি করতে, ফ্লো বিল্ডার API ব্যবহার করুন। flow বিল্ডার ফাংশন একটি নতুন ফ্লো তৈরি করে যেখানে আপনি emit ফাংশন ব্যবহার করে ডেটার প্রবাহে ম্যানুয়ালি নতুন মান নির্গত করতে পারেন।

নিম্নলিখিত উদাহরণে, একটি ডেটা উত্স একটি নির্দিষ্ট ব্যবধানে স্বয়ংক্রিয়ভাবে সর্বশেষ খবর নিয়ে আসে। যেহেতু একটি সাসপেন্ড ফাংশন একাধিক পরপর মান ফেরত দিতে পারে না, তাই ডেটা উৎস এই প্রয়োজনীয়তা পূরণের জন্য একটি প্রবাহ তৈরি করে এবং ফেরত দেয়। এই ক্ষেত্রে, ডেটা উত্স প্রযোজক হিসাবে কাজ করে।

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow বিল্ডার একটি coroutine মধ্যে মৃত্যুদন্ড কার্যকর করা হয়. এইভাবে, এটি একই অ্যাসিঙ্ক্রোনাস API থেকে উপকৃত হয়, তবে কিছু বিধিনিষেধ প্রযোজ্য:

  • প্রবাহ অনুক্রমিক হয়। যেহেতু প্রযোজক একটি করোটিনে থাকে, যখন একটি সাসপেন্ড ফাংশন কল করে, প্রযোজক সাসপেন্ড ফাংশনটি ফিরে না আসা পর্যন্ত স্থগিত করে। উদাহরণে, fetchLatestNews নেটওয়ার্ক অনুরোধ সম্পূর্ণ না হওয়া পর্যন্ত প্রযোজক স্থগিত করে। তবেই ফলটি স্রোতে নির্গত হয়।
  • flow নির্মাতার সাথে, প্রযোজক একটি ভিন্ন CoroutineContext থেকে মান emit করতে পারে না। অতএব, নতুন কোরোটিন তৈরি করে বা কোডের সাথে withContext ব্লক ব্যবহার করে ভিন্ন CoroutineContextemit কল করবেন না। আপনি এই ক্ষেত্রে callbackFlow এর মতো অন্যান্য ফ্লো নির্মাতা ব্যবহার করতে পারেন।

স্ট্রীম পরিবর্তন করা হচ্ছে

মধ্যস্থতাকারীরা মধ্যবর্তী অপারেটর ব্যবহার করতে পারে মানগুলি ব্যবহার না করেই ডেটার স্ট্রিম পরিবর্তন করতে। এই অপারেটরগুলি এমন ফাংশন যা, ডেটার একটি প্রবাহে প্রয়োগ করা হলে, ক্রিয়াকলাপের একটি চেইন সেট আপ করে যা ভবিষ্যতে মানগুলি গ্রাস না হওয়া পর্যন্ত কার্যকর করা হয় না। ফ্লো রেফারেন্স ডকুমেন্টেশনে মধ্যবর্তী অপারেটর সম্পর্কে আরও জানুন।

নীচের উদাহরণে, রিপোজিটরি স্তরটি View প্রদর্শিত ডেটা রূপান্তর করতে মধ্যবর্তী অপারেটর map ব্যবহার করে:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

মধ্যবর্তী অপারেটরগুলি একের পর এক প্রয়োগ করা যেতে পারে, একটি ক্রিয়াকলাপের শৃঙ্খল গঠন করে যা অলসভাবে সম্পাদিত হয় যখন একটি আইটেম প্রবাহে নির্গত হয়। মনে রাখবেন যে শুধুমাত্র একটি স্ট্রীমে একটি মধ্যবর্তী অপারেটর প্রয়োগ করা প্রবাহ সংগ্রহ শুরু করে না।

একটি প্রবাহ থেকে সংগ্রহ

মান শুনতে শুরু করতে ফ্লো ট্রিগার করতে একটি টার্মিনাল অপারেটর ব্যবহার করুন। স্ট্রীমের সমস্ত মানগুলি নির্গত হওয়ার সাথে সাথে পেতে, collect ব্যবহার করুন। আপনি অফিসিয়াল ফ্লো ডকুমেন্টেশনে টার্মিনাল অপারেটর সম্পর্কে আরও জানতে পারেন।

যেহেতু collect একটি সাসপেন্ড ফাংশন, এটি একটি কোরোটিনের মধ্যে কার্যকর করা দরকার। এটি একটি প্যারামিটার হিসাবে একটি ল্যাম্বডা নেয় যা প্রতিটি নতুন মানের উপর কল করা হয়। যেহেতু এটি একটি সাসপেন্ড ফাংশন, কল যে কোরোটিন collect তা প্রবাহ বন্ধ না হওয়া পর্যন্ত স্থগিত হতে পারে।

পূর্ববর্তী উদাহরণটি অব্যাহত রেখে, এখানে একটি ViewModel একটি সহজ বাস্তবায়ন রয়েছে যা সংগ্রহস্থল স্তর থেকে ডেটা গ্রহণ করে:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

প্রবাহ সংগ্রহ করা প্রযোজককে ট্রিগার করে যা সর্বশেষ সংবাদ রিফ্রেশ করে এবং একটি নির্দিষ্ট ব্যবধানে নেটওয়ার্ক অনুরোধের ফলাফল নির্গত করে। যেহেতু প্রযোজক সবসময় while(true) লুপের সাথে সক্রিয় থাকে, তাই ViewModel সাফ হয়ে গেলে এবং viewModelScope বাতিল হয়ে গেলে ডেটার স্ট্রিম বন্ধ হয়ে যাবে।

নিম্নলিখিত কারণে প্রবাহ সংগ্রহ বন্ধ হতে পারে:

  • যে কোরোটিন সংগ্রহ করে তা বাতিল করা হয়েছে, যেমনটি আগের উদাহরণে দেখানো হয়েছে। এটি অন্তর্নিহিত প্রযোজককেও থামিয়ে দেয়।
  • প্রযোজক আইটেম নির্গত শেষ. এই ক্ষেত্রে, ডেটার স্ট্রীম বন্ধ হয়ে যায় এবং কোরোটিন যেটিকে collect বলা হয় সেটি এক্সিকিউশন পুনরায় শুরু করে।

অন্যান্য মধ্যবর্তী অপারেটরগুলির সাথে নির্দিষ্ট না হলে প্রবাহগুলি ঠান্ডা এবং অলস হয়৷ এর মানে হল যে প্রতিবার ফ্লোতে টার্মিনাল অপারেটরকে কল করা হলে প্রযোজক কোডটি কার্যকর করা হয়। পূর্ববর্তী উদাহরণে, একাধিক ফ্লো সংগ্রাহক থাকার ফলে ডেটা উত্সটি বিভিন্ন নির্দিষ্ট ব্যবধানে একাধিকবার সর্বশেষ সংবাদ পেতে পারে। যখন একাধিক ভোক্তা একই সময়ে সংগ্রহ করে তখন একটি প্রবাহ অপ্টিমাইজ করতে এবং শেয়ার করতে, shareIn অপারেটর ব্যবহার করুন।

অপ্রত্যাশিত ব্যতিক্রম ধরা

প্রযোজকের বাস্তবায়ন তৃতীয় পক্ষের লাইব্রেরি থেকে আসতে পারে। এর মানে হল যে এটি অপ্রত্যাশিত ব্যতিক্রম নিক্ষেপ করতে পারে। এই ব্যতিক্রমগুলি পরিচালনা করতে, catch ইন্টারমিডিয়েট অপারেটর ব্যবহার করুন।

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

পূর্ববর্তী উদাহরণে, যখন একটি ব্যতিক্রম ঘটে, তখন collect ল্যাম্বডা বলা হয় না, কারণ একটি নতুন আইটেম গৃহীত হয়নি।

catch প্রবাহে আইটেম emit করতে পারে। উদাহরণ সংগ্রহস্থল স্তর পরিবর্তে ক্যাশে মান emit করতে পারে:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

এই উদাহরণে, যখন একটি ব্যতিক্রম ঘটে, তখন collect ল্যাম্বডা বলা হয়, কারণ ব্যতিক্রমের কারণে একটি নতুন আইটেম স্রোতে নির্গত হয়েছে।

একটি ভিন্ন Coroutine Context-এ কার্যকর করা হচ্ছে

ডিফল্টরূপে, একটি flow নির্মাতার প্রযোজক এটি থেকে সংগ্রহ করা coroutine-এর CoroutineContext এ সম্পাদন করে এবং পূর্বে উল্লেখ করা হয়েছে, এটি একটি ভিন্ন CoroutineContext থেকে মান emit করতে পারে না। এই আচরণ কিছু ক্ষেত্রে অবাঞ্ছিত হতে পারে। উদাহরণস্বরূপ, এই বিষয় জুড়ে ব্যবহৃত উদাহরণগুলিতে, সংগ্রহস্থল স্তরটি Dispatchers.Main এ ক্রিয়াকলাপ সম্পাদন করা উচিত নয় যা viewModelScope দ্বারা ব্যবহৃত হয়।

একটি প্রবাহের CoroutineContext পরিবর্তন করতে, মধ্যবর্তী অপারেটর flowOn ব্যবহার করুন। flowOn আপস্ট্রিম প্রবাহের CoroutineContext পরিবর্তন করে, যার অর্থ প্রযোজক এবং flowOn এর আগে (বা উপরে) প্রয়োগ করা যেকোনো মধ্যবর্তী অপারেটর। ডাউনস্ট্রিম প্রবাহ (ভোক্তার সাথে flowOn পরে মধ্যবর্তী অপারেটর) প্রভাবিত হয় না এবং প্রবাহ থেকে collect করতে ব্যবহৃত CoroutineContext এ কার্যকর হয়। যদি একাধিক flowOn অপারেটর থাকে, প্রতিটি তার বর্তমান অবস্থান থেকে আপস্ট্রিম পরিবর্তন করে।

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

এই কোডের সাহায্যে, onEach এবং map অপারেটররা defaultDispatcher ব্যবহার করে, যেখানে catch অপারেটর এবং ভোক্তা Dispatchers.Main কার্যকর করা হয়। viewModelScope দ্বারা ব্যবহৃত হয়।

যেহেতু ডাটা সোর্স লেয়ারটি I/O কাজ করছে, আপনার উচিত এমন একটি ডিসপ্যাচার ব্যবহার করা যা I/O অপারেশনের জন্য অপ্টিমাইজ করা হয়েছে:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

জেটপ্যাক লাইব্রেরিতে প্রবাহিত হয়

ফ্লো অনেক জেটপ্যাক লাইব্রেরিতে সংহত করা হয়েছে এবং এটি অ্যান্ড্রয়েড তৃতীয় পক্ষের লাইব্রেরির মধ্যে জনপ্রিয়। ফ্লো লাইভ ডেটা আপডেট এবং ডেটার অফুরন্ত স্ট্রিমগুলির জন্য একটি দুর্দান্ত ফিট।

ডাটাবেসের পরিবর্তন সম্পর্কে অবহিত হওয়ার জন্য আপনি ফ্লো উইথ রুম ব্যবহার করতে পারেন। ডেটা অ্যাক্সেস অবজেক্ট (DAO) ব্যবহার করার সময়, লাইভ আপডেট পেতে একটি Flow টাইপ ফেরত দিন।

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Example সারণীতে প্রতিবার পরিবর্তন হলে, ডাটাবেসের নতুন আইটেমগুলির সাথে একটি নতুন তালিকা নির্গত হয়।

কলব্যাক-ভিত্তিক APIগুলিকে প্রবাহে রূপান্তর করুন

callbackFlow হল একটি ফ্লো নির্মাতা যা আপনাকে কলব্যাক-ভিত্তিক APIগুলিকে ফ্লোতে রূপান্তর করতে দেয়। উদাহরণ হিসেবে, Firebase Firestore Android APIs কলব্যাক ব্যবহার করে।

এই APIগুলিকে প্রবাহে রূপান্তর করতে এবং Firestore ডাটাবেস আপডেট শুনতে, আপনি নিম্নলিখিত কোডটি ব্যবহার করতে পারেন:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

flow বিল্ডারের বিপরীতে, callbackFlow send ফাংশন সহ একটি ভিন্ন CoroutineContext থেকে বা trySend ফাংশন সহ একটি coroutine এর বাইরে মান নির্গত করার অনুমতি দেয়।

অভ্যন্তরীণভাবে, callbackFlow একটি চ্যানেল ব্যবহার করে, যা ধারণাগতভাবে একটি ব্লকিং সারির মতো। একটি চ্যানেল একটি ক্ষমতা সহ কনফিগার করা হয়, বাফার করা যেতে পারে এমন উপাদানগুলির সর্বাধিক সংখ্যা৷ callbackFlow তৈরি করা চ্যানেলটিতে 64টি উপাদানের ডিফল্ট ক্ষমতা রয়েছে। আপনি যখন একটি পূর্ণ চ্যানেলে একটি নতুন উপাদান যুক্ত করার চেষ্টা করেন, তখন নতুন উপাদানের জন্য স্থান না পাওয়া পর্যন্ত send প্রযোজককে স্থগিত করে, যেখানে trySend চ্যানেলে উপাদান যোগ করে না এবং অবিলম্বে false ফেরত দেয়।

trySend অবিলম্বে চ্যানেলে নির্দিষ্ট উপাদান যোগ করে, শুধুমাত্র যদি এটি তার ক্ষমতা সীমাবদ্ধতা লঙ্ঘন না করে, এবং তারপর সফল ফলাফল প্রদান করে।

অতিরিক্ত প্রবাহ সম্পদ