Configurazioni dei broker
| Proprietà | Default | Descrizione | | broker.id | | Ogni broker può essere identificato con un unico ID intero non negativo; Questo id può essere usato come "nome" del broker, e la sua esistenza permette al broker di migrare su un host/porta diverso senza confondere i consumatori. Puoi scegliere qualsiasi numero tu voglia come ID, purché l'ID sia unico. | | log.dirs | /tmp/kafka-logs | Il percorso in cui Kafka memorizza i dati. Questo percorso non è unico, può essere multiplo e i percorsi devono essere separati solo da virgole; Ogni volta che viene creata una nuova partizione, viene scelta di farlo sotto il percorso che contiene il minor numero di partizioni. | | Porto | 6667 | Il server accetta la porta a cui il client si collega | | zookeeper.connect | nullo | Il formato della stringa di connessione ZooKeeper è: hostname:port, dove hostname e port sono rispettivamente host e port di un nodo nel cluster ZooKeeper. Per connettersi ad altri nodi ZooKeeper quando un host cade, puoi creare più host come segue:
hostname1:port1, hostname2:port2, hostname3:port3.
ZooKeeper ti permette di aggiungere un percorso "chroot" per memorizzare tutti i dati kafka nel cluster sotto un percorso specifico. Quando più cluster Kafka o altre applicazioni usano lo stesso cluster ZooKeeper, puoi usare questo metodo per impostare il percorso di archiviazione dei dati. Questo può essere implementato formattando la stringa di connessione in questo modo: Nome host1:porta1,nomehost2:port2,nomehost3:porta3/chroot/path Questa configurazione memorizza tutti i dati del cluster kafka sotto il percorso /chroot/path. Nota che prima di avviare il broker, devi creare questo percorso e i consumatori devono utilizzare lo stesso formato di connessione. | | message.max.byte | 1000000 | La dimensione massima dei messaggi che un server può ricevere. È importante che le impostazioni del consumatore e del produttore riguardo a questa proprietà siano sincronizzate, altrimenti il messaggio pubblicato dal produttore sarebbe troppo ampio per il consumatore. | | num.network.threads | 3 | Il numero di thread di rete utilizzati dal server per elaborare le richieste di rete; In generale non è necessario cambiare questa proprietà. | | num.io.threads | 8 | Il numero di thread I/O utilizzati dal server per elaborare le richieste; Il numero di thread dovrebbe essere almeno uguale al numero di hard disk. | | background.threads | 4 | Il numero di thread utilizzati per l'elaborazione in background, come la cancellazione dei file; Non è necessario cambiare questa proprietà. | | queued.max.richieste | 500 | Il numero massimo di richieste che può essere messo in coda per un thread di I/O da elaborare prima che un thread di rete smetta di leggere nuove richieste. | | host.name | nullo | nome ospite del broker; Se il nome host è già impostato, il broker sarà vincolato solo a questo indirizzo; Se non c'è un'impostazione, si legherà a tutte le interfacce e pubblicherà una copia su ZK | | advertised.host.name | nullo | Se impostato, viene inviato a produttori, consumatori e altri broker come nome host del broker | | pubblicizzata.port | nullo | Questo porto viene fornito a produttori, consumatori e altri broker, ed è utilizzato per stabilire una connessione; Deve essere impostato solo se la porta effettiva e quella che il server deve binare sono diverse. | | socket.send.buffer.bytes | 100 * 1024 | SO_SNDBUFF Dimensione della cache, che il server utilizza per effettuare connessioni socket | | socket.ricevimento.buffer.bytes | 100 * 1024 | SO_RCVBUFF dimensione della cache, che viene usata dal server per collegarsi ai socket | | socket.request.max.byte | 100 * 1024 * 1024 | La dimensione massima della richiesta consentita dal server; Questo eviterà overflow di server, che dovrebbero essere più piccoli della dimensione dell'heap Java | | num.partizioni | 1 | Se il numero di partizioni non viene indicato durante la creazione di un argomento, questo numero sarà il numero predefinito di partizioni sotto l'argomento. | | log.segment.bytes | 1014*1024*1024 | I log della partizione argomentata sono memorizzati in molti file in una certa directory, che dividono i log della partizione in segmenti; Questo attributo è la dimensione massima di ogni file; Quando le dimensioni raggiungono questo valore, viene creato un nuovo file. Questa impostazione può essere superata dalla base di ogni argomento. Vista Il login del link ipertestuale è visibile. | | log.roll.hours | 24 * 7 | Anche se il file non raggiunge log.segment.bytes, un nuovo file viene creato ogni volta che il momento di creazione del file raggiunge questa proprietà. Questa impostazione può anche essere sovrascritta da impostazioni a livello di tema; VistaIl login del link ipertestuale è visibile. | | log.cleanup.policy | elimina | | | log.retention.minutes e log.retention.hours | 7 giorni | Il momento in cui ogni file di registro è stato salvato prima di essere cancellato. Il tempo predefinito di risparmio dati è lo stesso per tutti gli argomenti. log.retention.minutes e log.retention.bytes sono entrambi usati per impostare la cancellazione dei file di log, indipendentemente da quale proprietà sia stata sfiorata. Questa impostazione di proprietà può essere superata quando l'argomento è praticamente definito. VistaIl login del link ipertestuale è visibile. | | log.retention.bytes | -1 | La quantità totale di dati salvati da ciascuna partizione sotto ogni argomento; Nota che questo è il limite superiore per partizione, quindi questo numero moltiplicato per il numero di partizioni corrisponde alla quantità totale di dati memorizzati per argomento. Nota anche: se sono impostati sia log.retention.hours che log.retention.bytes, superare uno dei due limiti causerà l'eliminazione di un file segmento. Nota che questa impostazione può essere sovrascritta da ciascun argomento. VistaIl login del link ipertestuale è visibile. | | log.retention.check.interval.ms | 5 minuti | Controlla l'intervallo tra i file segmentati nel log per determinare se gli attributi del file soddisfano i requisiti di cancellazione. | | log.cleaner.enable | false | Quando questa proprietà viene impostata su falsa, verrà eliminata una volta che il log è memorizzato per un tempo o dimensione massimo. Se impostato su vero, succederà quando l'attributo di salvataggio raggiunge il limite superioreIl login del link ipertestuale è visibile.。 | | log.cleaner.threads | 1 | Il numero di thread che eseguono la compressione logarista | | log.cleaner.io.max.bytes.per.secondo. | Nessuno | Il numero massimo di I/O che un pulitore di log può avere quando esegue una compattazione di log. Questa impostazione limita il pulitore per evitare interferenze con i servizi di richiesta attivi. | | log.cleaner.io.buffer.size | 500*1024*1024 | Log Cleaner indicizza i log durante il processo di pulizia e riduce la dimensione della cache utilizzata. È meglio impostarlo grande per fornire una memoria sufficiente. | | log.cleaner.io.buffer.load.factor | 512*1024 | La dimensione del blocco I/O necessario per la pulizia dei tronchi. Non è necessario cambiare questa impostazione. | | log.cleaner.io.buffer.load.factor | 0.9 | il fattore di carico della tabella hash utilizzata nella pulizia dei log; Non devi cambiare questa opzione. | | log.cleaner.backoff.ms | 15000 | L'intervallo di tempo in cui il registro viene ripulito viene eseguito | | log.cleaner.min.cleanable.ratio | 0.5 | Questa configurazione controlla quanto spesso il compattatore di tronchi cerca di pulire i tronchi (presumibilmenteIl login del link ipertestuale è visibile.è aperto). Per impostazione predefinita, evita di pulire più del 50% dei tronchi. Questo rapporto è legato allo spazio massimo occupato dal log di backup (il 50% dei log è compresso al 50%). Un tasso più alto significa meno sprechi e più spazio che può essere liberato in modo più efficiente. Questa impostazione può essere sovrascritta in ogni impostazione tematica. VistaIl login del link ipertestuale è visibile.。 | | log.cleaner.delete.retention.ms | 1 giorno | tempo di conservazione; Il tempo massimo per conservare i tronchi compressi; È anche il tempo massimo per il client per consumare i messaggi, e la differenza tra log.retention.minutes è che uno controlla i dati non compressi e l'altro i dati compressi, che verranno sovrascritti dal tempo specificato quando l'argomento viene creato. | | log.index.size.max.byte | 10*1024*1024 | La dimensione massima per segmento di tronc. Si noti che se la dimensione logaritmica raggiunge questo valore, è necessario generare un nuovo segmento logaritmico anche se la dimensione non supera il limite log.segment.byte. | | log.index.interval.bytes | 4096 | Quando esegui un fetch, devi scansionare l'offset più vicino con una certa quantità di spazio; più grande è l'impostazione, meglio è; generalmente usa il valore predefinito | | log.flush.interval.messages | Long.MaxValue | il file di log si "sincronizza" con il disco prima di accumulare i messaggi. Poiché le operazioni di IO su disco sono operazioni lente, ma anche un mezzo necessario per la "affidabilità dei dati", verifica se è necessario l'intervallo di tempo per curare sul disco rigido. Se questo valore è troppo grande, porterà a un tempo di "sincronizzazione" troppo lungo (blocco IO); se questo valore è troppo basso, porterà a un lungo periodo di "fsync" (blocco IO); se questo valore è troppo piccolo, porterà a un gran numero di volte di "sincronizzazione", il che significa che la richiesta complessiva del client ha un certo ritardo, e il guasto del server fisico comporterà la perdita di messaggi senza fsync. | | log.flush.scheduler.interval.ms | Long.MaxValue | Controlla se sono necessari intervalli fsync | | log.flush.interval.ms | Long.MaxValue | Questo numero viene utilizzato per controllare l'intervallo di tempo del "fsync"; se il numero di messaggi non raggiunge mai quello di messaggi fissati sul disco, ma l'intervallo di tempo dall'ultima sincronizzazione del disco raggiunge la soglia, verrà attivata anche la sincronizzazione del disco. | | log.delete.delay.ms | 60000 | Il tempo di conservazione dopo che il file è stato cancellato nell'indice generalmente non necessita di modifiche | | auto.create.topics.enable | true | Se consentire la creazione automatica degli argomenti. Se è vero, creerà automaticamente un argomento che non esiste quando produce o fetch non esiste. Altrimenti, devi usare la riga di comando per creare un argomento | | controller.socket.timeout.ms | 30000 | Il tempo di timeout del socket quando il controller di gestione delle partizioni effettua un backup. | | controller.message.queue.size | Int.MaxValue | controller-a-broker-channles | | fattore di replicazione predefinita | 1 | Il numero predefinito di copie di backup si riferisce solo agli argomenti creati automaticamente | | replica.lag.time.max.ms | 10000 | Se un seguace non invia una richiesta di recupero entro questo tempo, il leader rimuoverà il seguace dall'ISR e considererà che il seguace è stato impiccato. | | replica.lag.max.messaggi | 4000 | Se una replica ha più di questo numero di seguaci non appoggiati, il leader rimuoverà il seguace e considererà il seguace come appeso | | replica.socket.timeout.ms | 30*1000 | Timeout del leader per le richieste di rete socket durante il backup dei dati | | replica.socket.ricevi.buffer.bytes | 64*1024 | Buffer di ricezione socket quando si invia una richiesta di rete al leader durante il backup | | replica.fetch.max.byte | 1024*1024 | Il valore massimo di ogni fetch al momento del backup | | replica.fetch.min.bytes | 500 | Il tempo massimo di attesa affinché i dati raggiungano il leader quando il leader effettua una richiesta di backup | | replica.fetch.min.bytes | 1 | La dimensione minima della risposta dopo ogni recupero durante il retrocesso | | num.replica.fetchers | 1 | Il numero di thread che supportano i dati del leader | | replica.high.watermark.checkpoint.interval.ms | 5000 | Ogni replica controlla con quale frequenza viene indurita la quantità massima d'acqua | | fetch.purgatory.purge.interval.requests | 1000 | richiesta di recupero dell'intervallo di purga | | producer.purgatory.purge.interval.requests | 1000 | il produttore richiede un intervallo di purga | | zookeeper.session.timeout.ms | 6000 | Time out per la sessione di zoo. | | zookeeper.connection.timeout.ms | 6000 | Il tempo massimo che il cliente aspetta per stabilire un collegamento con il custode dello zoo | | zookeeper.sync.time.ms | 2000 | Il seguace ZK è indietro rispetto al leader ZK per molto tempo | | controlled.shutdown.enable | true | Se è possibile controllare la chiusura del broker. Se possibile, il broker sarà in grado di spostare tutti i leader su altri broker prima della chiusura. Questo riduce l'indisponibilità durante il processo di spegnimento. | | controlled.shutdown.max.ritentativi | 3 | Il numero di comandi che possono eseguire con successo uno spegnimento prima di eseguire uno spegnimento incompleto. | | controlled.shutdown.retry.backoff.ms | 5000 | Tempo di pausa tra uno spegno e l'altro | | auto.leader.rebalance.enable | true | Se ciò è vero, il controller bilancierà automaticamente la leadership dei broker sulle partizioni | | leader.squilibrio.per.broker.percentuale | 10 | Il rapporto massimo di squilibrio consentito da ciascun broker | | leader.squilibrio.controllo.intervallo.secondi | 300 | Controlla la frequenza dello squilibrio del leader | | offset.metadata.max.byte | 4096 | Permette ai client di salvare il massimo numero dei loro offset | | max.connections.per.ip | Int.MaxValue | Il numero massimo di connessioni per broker può essere effettuato a ciascun indirizzo IP | | max.connections.per.ip.overrides | | La copertura massima della connessione predefinita per IP o nome host | | connections.max.idle.ms | 600000 | Limite di timeout per le connessioni vuote | | log.roll.jitter. {ms,hours} | 0 | Il numero massimo di jitter astratti da logRollTimeMillis | | num.recovery.threads.per.data.dir | 1 | Il numero di thread che ogni directory di dati utilizza per registrare il recupero | | unclean.leader.elezione.enable | true | Indica se è possibile usare l'impostazione non-replica in ISR come leader | | delete.topic.enable | false | Possibilità di eliminare argomenti | | offsets.topic.num.partitions | 50 | Il numero di partizioni per l'argomento di offset commit. Poiché cambiare questo dopo il deployment non è attualmente supportato, consigliamo di usare un'impostazione più alta per la produzione (ad esempio, 100-200). | | offsets.topic.retention.minutes | 1440 | Gli offset esistenti da più tempo saranno segnati come cancellazione in attesa | | offsets.retention.check.interval.ms | 600000 | Il gestore degli offset controlla la frequenza degli offset obsoleti | | offsets.topic.replication.factor | 3 | Il numero di copie di backup dell'offset dell'argomento. Si consiglia impostare numeri più alti per garantire una maggiore disponibilità | | offset.topic.segment.bytes | 104857600 | cambia argomento. | | offsets.load.buffer.size | 5242880 | Questa impostazione è correlata alla dimensione del lotto ed è utilizzata quando si legge dal segmento di offset. | | offsets.commit.required.acks | -1 | Il numero di conferme deve essere impostato prima che il commit offset sia accettabile e generalmente non deve essere modificato |
| Proprietà | Default | Proprietà predefinita del server | Descrizione | | cleanup.policy | elimina | log.cleanup.policy | O "elimina" o "compatta"; Questa stringa indica come utilizzare la vecchia parte del tronco; Il metodo predefinito ("delete") scarterà la parte precedente quando si raggiunge il limite di tempo o dimensione di riciclaggio. "compatto" comprime i tronchi | | delete.retention.ms | 86400000 (24 ore) | log.cleaner.delete.retention.ms | La differenza tra log.retention.minutes è che uno controlla i dati non compressi e l'altro i dati compressi. Questa configurazione può essere sovrascritta dai parametri di pinning quando viene creato l'argomento | | flush.messages | Nessuno | log.flush.interval.messages | Questa configurazione specifica un intervallo di tempo per forzare i log fsync. Ad esempio, se questa opzione è impostata su 1, allora è richiesta la sincronizzazione dopo ogni messaggio, e se impostata su 5, la sincronizzazione è richiesta ogni 5 messaggi. In generale, si consiglia di non impostare questo valore. L'impostazione di questo parametro richiede il necessario compromesso tra "affidabilità dei dati" e "prestazioni". Se questo valore è troppo grande, si verifica un lungo periodo di "fsync" ogni volta (blocco IO), e se questo valore è troppo piccolo, si genera un gran numero di "fsync", il che significa anche che c'è un certo ritardo nella richiesta complessiva del client. Un guasto fisico del server causerà la perdita dei messaggi senza fsync. | | flush.ms | Nessuno | log.flush.interval.ms | Questa configurazione viene utilizzata per fissare l'intervallo di tempo tra il forcing dei log fsync su disco; Ad esempio, se impostato a 1000, allora è necessario il fsync ogni 1000ms. Questa opzione generalmente non è consigliata | | index.intervallo.bytes | 4096 | log.index.interval.bytes | L'impostazione predefinita garantisce che aggiungiamo un indice al messaggio ogni 4096 byte, e più indici rendono il messaggio letto più vicino, ma la dimensione dell'indice sarà aumentata. Questa opzione generalmente non è obbligatoria | | max.message.bytes | 1000000 | max.message.bytes | La dimensione massima del messaggio aggiunto kafka. Nota che se aumenti questa dimensione, devi anche aumentare la dimensione del fetch del tuo consumatore in modo che il consumatore possa recuperare i messaggi a quelle dimensioni massime. | | min.pulibile.sporco.rapporto | 0.5 | min.pulibile.sporco.rapporto | Questa configurazione controlla la frequenza con cui il compressore di log tenta di purgare i log. Per impostazione predefinita, si eviteranno logs con un tasso di compressione superiore al 50%. Questo rapporto evita il maggior spreco di spazio | | min.insync.replicas | 1 | min.insync.replicas | Quando il produttore è impostato su request.required.acks a -1, min.insync.replicas specifica il numero minimo di repliche (ogni scrittura di repica deve essere soddisfatta), e se questo numero non viene raggiunto, il produttore produrrà un'eccezione. | | retention.bytes | Nessuno | log.retention.bytes | Se si utilizza la politica di "eliminazione" di conservazione, questa configurazione si riferisce alla dimensione massima che il log può raggiungere prima di essere cancellato. Per impostazione predefinita, non c'è un limite di dimensione ma solo un limite di tempo | | retention.ms | 7 giorni | log.retention.minutes | Se usi la politica di "elimina" di conservazione, questa configurazione si riferisce al momento in cui il log è stato salvato prima del log di cancellazione. | | segment.bytes | 1GB | log.segment.bytes | In Kafka, i log log sono immagazzinati in blocchi, e questa configurazione si riferisce alla dimensione dei log divisi in blocchi | | segment.index.bytes | 10MB | log.index.size.max.byte | Questa configurazione è circa la dimensione del file indice mappato tra gli offset e le posizioni dei file; Questa configurazione generalmente non necessita di essere modificata | | segment.ms | 7 giorni | log.roll.hours | Anche se il file log chunk non raggiunge la dimensione necessaria per essere eliminata o compressa, una volta che il tempo di log raggiunge questo limite superiore, un nuovo file log chunk sarà forzato | | segment.jitter.ms | 0 | log.roll.jitter. {ms,hours} | Il jitter massimo da sottrarre da logRollTimeMillis. |
Configurazioni per consumatori
| Proprietà | Default | Descrizione | | group.id | | Una stringa che identifica in modo univoco il gruppo in cui si trova il processo consumer e, se viene impostato lo stesso ID di gruppo, significa che questi processi appartengono allo stesso gruppo di consumatori | | zookeeper.connect | | Specifica la stringa della connessione Zookeeper, il formato è hostname:port, qui host e porta sono sia host che porta del server Zookeeper, per evitare di perdere il contatto dopo che una macchina Zookeeper si spenta, puoi specificare più hostname:port, usando virgole come separazione: Nome host1:porta1,nomehost2:porta2,nomehost3:porta3 Puoi aggiungere il percorso chroot di ZooKeeper alla stringa di connessione ZooKeeper, che viene usata per memorizzare i propri dati, in un modo: Nome host1:porta1,nomehost2:port2,nomehost3:porta3/chroot/path | | consumer.id | nullo | Non è richiesta alcuna configurazione e viene generalmente generata automaticamente | | socket.timeout.ms | 30*100 | Limiti di timeout per le richieste di rete. Il vero limite di timeout è max.fetch.wait+socket.timeout.ms | | socket.ricevimento.buffer.bytes | 64*1024 | Socket viene utilizzato per ricevere la dimensione della cache delle richieste di rete | | fetch.message.max.byte | 1024*1024 | Il numero massimo di byte per messaggio di recupero per richiesta di recupero. Questi byte saranno supervisionati nella memoria utilizzata per ogni partizione, quindi questa impostazione controllerà la quantità di memoria utilizzata dal consumatore. La dimensione della richiesta di fetch deve essere almeno uguale alla dimensione massima del messaggio consentita dal server, altrimenti la dimensione del messaggio che il produttore può inviare è maggiore di quella che il consumatore può consumare. | | num.consumer.fetchers | 1 | Il numero di thread di fetch utilizzati per i dati di fetch | | auto.commit.enable | true | Se vero, lo offset del messaggio recuperato dal consumatore sarà automaticamente sincronizzato con il guardiano dello zoo. Questo offset di commit sarà utilizzato dal nuovo consumatore quando il processo si blocca | | auto.commit.interval.ms | 60*1000 | La frequenza con cui il consumatore invia lo offset al gestore dello zoo è in pochi secondi | | queued.max.messaggi.chunks | 2 | Il numero massimo di messaggi utilizzati per la cache per il consumo. Ogni chunk deve essere uguale a fetch.message.max.byte | | rebalance.max.ritentativi | 4 | Quando un nuovo consumatore viene aggiunto a un gruppo di consumatori, il gruppo di consumatori cerca di ribilanciare il numero di partizioni assegnate a ciascun consumatore. Se la collezione dei consumatori cambia, questo ribilanciamento fallisce e si rientra quando l'allocazione viene eseguita | | fetch.min.bytes | 1 | Il numero minimo di byte che il server dovrebbe restituire con ogni richiesta di fetch. Se non vengono restituiti abbastanza dati, la richiesta attende fino a quando non ne vengono restituiti sufficienti. | | fetch.wait.max.ms | 100 | Se non ci sono abbastanza dati per soddisfare fetch.min.bytes, questa configurazione si riferisce al tempo massimo che il server bloccherà prima di rispondere a una richiesta di fetch. | | rebalance.backoff.ms | 2000 | Tempo di ritiro prima di riprovare il reblance | | refresh.leader.backoff.ms | 200 | C'è un tempo di indietro da aspettare prima di cercare di determinare se il leader di una partizione abbia perso la sua leadership | | auto.offset.reset | più grande | Se non c'è un offset inizializzato in zookeeper, se offset è una risposta al seguente valore: il più piccolo: Reset automatico dell'offset al minimo offset la più grande: Reset automatico offset all'offset del massimo Altro: fa un'eccezione al consumatore | | consumer.timeout.ms | -1 | Se non è disponibile alcun messaggio, anche dopo aver aspettato un certo periodo di tempo, viene lanciata un'eccezione di timeout | | exclude.argomenti.interni | true | Se esporre ai consumatori messaggi da argomenti interni | | paritition.assignment.strategy | Distribuzione | Seleziona la policy per l'assegnazione delle partizioni al flusso di consumo, opzionalmente intervallo, roundrobin | | client.id | Valore ID di gruppo | è una stringa specifica per l'utente che aiuta a tracciare le chiamate in ogni richiesta. Dovrebbe logicamente confermare l'applicazione che ha generato la richiesta | | zookeeper.session.timeout.ms | 6000 | Limiti di timeout per le sessioni con il guardiano dello zoo. Se il consumatore non invia un messaggio cardiaco al custode dello zoo durante questo periodo, si considera che è stato riattaccato e verrà generato un rialzo | | zookeeper.connection.timeout.ms | 6000 | Il tempo massimo di attesa per un cliente per stabilire una connessione con il Custodia dello Zoo | | zookeeper.sync.time.ms | 2000 | I seguaci ZK possono rimanere indietro rispetto al leader ZK per un tempo massimo | | offsets.storage | Custode dello zoo | Luoghi utilizzati per conservare gli offset: zookeeper o kafka | | offset.channel.backoff.ms | 1000 | Il tempo di backoff per riconnettersi al canale offset o per riprovare la richiesta di fetch/commit dell'offset fallito | | offsets.channel.socket.timeout.ms | 10000 | Il limite di timeout del socket per la risposta alla risposta alla richiesta di fetch/commit durante la lettura dello offset. Questo limite di timeout viene utilizzato dalla richiesta consumerMetadata per richiedere la gestione degli offset | | offsets.commit.max.ritentativi | 5 | Il numero di volte in cui il commit offset è stato riprovato. Questo retry viene applicato solo per spostare i commit tra uno spegnimento e l'altro. Lui | | dual.commit.enabled | true | Se usi "kafka" come offsets.storage, puoi fare commit di offset su zookeeper due volte (e una volta su kafka). Questo è fondamentale quando si passa dallo stoccaggio offset basato su uno zoo-keeper a quello basato su kafka. Per ogni gruppo di consumatori, è una raccomandazione sicura disattivare questa opzione una volta completata la migrazione | | partizione.assegnazione.strategia | Distribuzione | Scegliere tra le politiche "range" e "roundrobin" come policy per l'assegnazione delle partizioni ai flussi dati dei consumatori; L'allocator di partizioni circolari alloca tutte le partizioni disponibili così come tutti i thread consumer disponibili. Assegnerà il ciclo di partizione al thread consumer. Se tutte le istanze consumer sono sottoscritte a un determinato, le partizioni sono suddivise in distribuzioni deterministiche. La strategia di allocazione a giro all'italiana è possibile solo se vengono soddisfatte le seguenti condizioni: (1) Ogni argomento ha lo stesso numero di flussi di dati per forza del consumatore. (2) La raccolta degli argomenti sottoscritti è determinata per ogni istanza di consumatori nel gruppo di consumatori. |
Configurazioni del produttore
| Proprietà | Default | Descrizione | | metadata.broker.list | | Servire bootstrap. Producer viene usato solo per ottenere metadati (argomenti, partizioni, repliche). La connessione al socket per inviare i dati effettivi sarà stabilita sulla base dei metadati restituiti. Il formato è: host1:port1,host2:port2 Questa lista può essere una sottolista di broker o un VIP che indica broker | | request.required.acks | 0 | Questa configurazione è un valore di conferma che indica quando una richiesta di prodotto è considerata completa. In particolare, quanti altri broker devono aver inviato dati ai loro log e confermato queste informazioni al loro leader. I valori tipici includono: 0: Indica che il produttore non attende mai la conferma dal broker (stesso comportamento della 0.7). Questa opzione offre la minima latenza ma allo stesso tempo il rischio maggiore (perché i dati si perdono quando il server si spegne). 1: Indica che la replica del leader ha ricevuto la conferma dei dati. Questa opzione ha una bassa latenza e garantisce che il server confermi che è stata ricevuta. -1: Il produttore riceve la conferma che tutte le repliche sincronizzate hanno ricevuto i dati. La latenza è la più elevata, tuttavia questo metodo non elimina completamente il rischio di messaggi persi, perché il numero di repliche sincronizzate può essere 1. Se vuoi assicurarti che alcune repliche ricevano dati, dovresti impostare l'opzione min.insync.replicas nelle impostazioni a livello di argomento. Leggi la documentazione del design per una discussione più approfondita. | | request.timeout.ms | 10000 | Il broker fa del suo meglio per implementare il requisito request.required.acks, altrimenti verrà inviato un errore al cliente | | producer.type | sincronizzazione | Questa opzione identifica se il messaggio viene inviato asincronamente in un thread in background. Valori corretti: (1) asincrono: invio asincrono (2) sincronizzazione: Invio sincronizzato Impostando il produttore su asincrono, possiamo elaborare le richieste in batch (il che è positivo per un throughput più alto), ma questo crea anche la possibilità che la macchina client perda dati non inviati | | serializer.class | kafka.serializer.DefaultEncoder | La categoria di serializzazione del messaggio. L'encoder predefinito inserisce un byte[] e restituisce lo stesso byte[] | | key.serializer.class | | Classe di serializzazione per parole chiave. Se questo non viene fornito, il predefinito è corrispondere al messaggio | | partitioner.class | kafka.producer.DefaultPartitioner | classe partitioner per dividere i messaggi tra sottoargomenti. Il partizionatore predefinito si basa sulla tabella hash della chiave | | compression.codec | nessuno | Questo parametro può impostare il codec per la compressione dei dati, che può essere selezionato come "nessuno", "gzip", "snappy". | | Topics.compressed | nullo | Questo parametro può essere usato per determinare se certi argomenti sono compressi. Se il codec compresso è un codec diverso da NoCompressCodec, questi codec vengono applicati ai dati specificati degli argomenti. Se l'elenco degli argomenti compressi è vuoto, applica il codec compresso specifico a tutti gli argomenti. Se il codec compresso è NoCompressionCodec, la compressione non è disponibile per tutti gli argomenti. | | message.send.max.ritentativi | 3 | Questo parametro farà sì che il produttore riprovi automaticamente le richieste di invio fallite. Questo parametro fissa il numero di tentativi. Nota: Impostare un valore non 0 causerà la ripetizione di alcuni errori di rete: causa un invio e una perdita di conferma | | retry.backoff.ms | 100 | Prima di ogni tentativo, il produttore aggiorna i metadati dell'argomento rilevante per verificare se viene assegnato il nuovo leader. Poiché la selezione del leader richiede un po' di tempo, questa opzione specifica quanto tempo il produttore deve aspettare prima di aggiornare i metadati. | | topic.metadata.refresh.interval.ms | 600*1000 | Il produttore generalmente aggiorna i metadati dell'argomento in alcuni scenari di guasto (partizione mancante, leader non disponibile, ecc.). Seguirà un ciclo regolare. Se lo imposti a un valore negativo, i metadati verranno aggiornati solo se falliscono. Se impostato a 0, i metadati vengono aggiornati dopo ogni messaggio inviato (questa opzione non è raccomandata, il sistema consuma troppo). Importante: gli aggiornamenti avvengono solo dopo l'invio del messaggio, quindi se il produttore non invia mai il messaggio, i metadati non vengono mai aggiornati. | | queue.buffering.max.ms | 5000 | L'intervallo massimo di tempo in cui l'utente memorizza i dati nella cache quando viene applicata la modalità asincrona. Ad esempio, se il messaggio è impostato su 100, i messaggi entro 100ms verranno elaborati in lotti. Questo migliorerà la velocità di passaggio, ma aumenterà la latenza a causa della cache. | | queue.buffering.max.messaggi | 10000 | Quando si utilizza la modalità asincrona, il numero massimo di messaggi non inviati che possono essere memorizzati nella coda prima del produttore deve essere bloccato o i dati devono andare persi | | batch.num.messages | 200 | Quando si usa la modalità asincrona, puoi elaborare in batch il massimo numero di messaggi. Oppure il numero di messaggi è arrivato online o queue.buffer.max.ms è arrivato, e il produttore lo elaborerà | | send.buffer.bytes | 100*1024 | Dimensione della cache di scrittura del socket | | client.id | “” | Questo client id è una stringa specifica per l'utente che viene inclusa in ogni richiesta per tracciare la chiamata, e dovrebbe logicamente poter confermare che l'applicazione ha effettuato la richiesta. |
Configurazioni del produttore
| Nome | Digitare | Default | Importanza | Descrizione | | boostrap.servers | Elenco | | alto | Gruppo host/porta per stabilire una connessione con il cluster kafka. I dati saranno caricati in modo uniforme su tutti i server, indipendentemente dal server designato per il bootstrapping. Questa lista riguarda solo gli host inizializzati (che viene utilizzata per scoprire tutti i server). Questo formato di lista:
host1:port1,host2:port2,... Poiché questi server sono usati solo per inizializzare le connessioni e scoprire tutte le appartenenze al cluster (che possono cambiare dinamicamente), questa lista non deve necessariamente contenere tutti i server (potresti voler più di un server, anche se in questo caso uno potrebbe essere fuori uso). Se non appare alcun server in questa lista, l'invio dei dati fallirà finché la lista non sarà disponibile. | | acks | Stringa | 1 | alto | Il produttore ha bisogno di un segnale dal server per confermare la ricezione dopo aver ricevuto i dati, e questa configurazione si riferisce a quanti segnali di conferma di questo tipo il produttore necessita. Questa configurazione rappresenta in realtà la disponibilità di backup dei dati. Le seguenti impostazioni sono opzioni comuni: (1) acks=0: impostato a 0 significa che il produttore non deve attendere alcuna conferma delle informazioni ricevute. La replica verrà immediatamente aggiunta al buffer del socket e considerata inviata. Non c'è garanzia che il server abbia ricevuto con successo i dati in questo caso, e il ritentativo della configurazione non funzionerà (perché il client non sa se è fallito) e l'offset del feedback sarà sempre impostato a -1. (2) acks=1: Questo significa che almeno si deve aspettare che il leader scriva con successo i dati nel log locale, ma non che tutti i follower scrivano con successo. In questo caso, se il follower non riesce a fare il backup dei dati e il leader riattacca di nuovo, il messaggio andrà perso. (3) acks=all: Questo significa che il leader deve aspettare che tutti i backup scrivano con successo i log, e questa strategia garantirà che i dati non vengano persi finché sopravviva un backup. Questa è la garanzia più forte. (4) Sono possibili anche altre impostazioni, come ack=2, che richiederanno un dato numero di ack, ma questa strategia è generalmente raramente utilizzata. | | buffer.memory | lunga | 33554432 | alto | Il produttore può essere utilizzato per memorizzare in cache la dimensione della memoria dei dati. Se i dati vengono generati più rapidamente di quanto vengano inviati al broker, il produttore bloccherà o lancerà un'eccezione, indicata da "block.on.buffer.full".
Questa impostazione sarà correlata alla memoria totale che il produttore può utilizzare, ma non è un limite rigido, poiché non tutta la memoria usata dal produttore viene utilizzata per la cache. Una parte della memoria aggiuntiva viene utilizzata per la compressione (se viene introdotta) e una parte viene impiegata per richieste di manutenzione. | | compression.type | Stringa | nessuno | alto | Producer è il tipo di compressione utilizzata per comprimere i dati. Il valore predefinito è non compresso. I valori corretti delle opzioni sono nessuno, gzip, snappy. La compressione è meglio utilizzata per l'elaborazione batch: più messaggi vengono elaborati in batch, migliori sono le prestazioni di compressione. | | Ritentativi | int | 0 | alto | Impostare un valore maggiore di 0 farà sì che il client riinvii qualsiasi dato una volta che questi dati falliscono. Si noti che questi tentativi non differiscono da quelli in cui il client riceve un errore di invio. Permette ai tentativi di cambiare potenzialmente l'ordine dei dati; se entrambi i record dei messaggi vengono inviati alla stessa partizione, il primo messaggio fallisce, il secondo messaggio appare prima del primo. | | batch.size | int | 16384 | Medio | Il produttore cercherà di battechare i record dei messaggi per ridurre il numero di richieste. Questo migliorerà le prestazioni tra client e server. Questa configurazione controlla il numero predefinito di byte dei messaggi di elaborazione batch. Non si tenta di elaborare byte dei messaggi superiori a questo conteggio. Le richieste inviate ai broker conterranno più lotti, che conterranno una richiesta per ogni partizione. I valori di batch più piccoli sono meno utilizzati e possono ridurre la velocità di produzione (0 utilizzerà solo il batch). I valori di batch più grandi sprecano più spazio di memoria, quindi devi allocare la memoria per valori di batch specifici. | | client.id | Stringa | | Medio | Questa stringa viene inviata al server quando viene effettuata una richiesta. Lo scopo è poter tracciare la fonte delle richieste per permettere ad alcune applicazioni al di fuori della lista di permessi IP/Porta di inviare informazioni. Questa app può impostare qualsiasi stringa perché non ha altro scopo funzionale se non registrare e tracciare | | linger.ms | lunga | 0 | Medio | Il gruppo di produttori aggrega eventuali messaggi che arrivano tra la richiesta e l'invio, registrando un lotto separato di richieste. Tipicamente, ciò accade solo quando il record viene generato più velocemente della velocità di invio. Tuttavia, in determinate condizioni, il cliente vorrà ridurre il numero di richieste, o addirittura un carico moderato. Questa configurazione avviene aggiungendo un piccolo ritardo: cioè, invece di inviare immediatamente un disco, il produttore attende un determinato tempo di ritardo per permettere l'invio di altri record dei messaggi, che possono essere combinati in batch. Questo può essere considerato un algoritmo simile a TCP Nagle. Questa impostazione stabilisce un limite di latenza più alto per il batching: una volta ottenuta la dimensione batch di una partizione, la invierà immediatamente indipendentemente da questa impostazione, ma se riceviamo un messaggio con un numero di byte molto più piccolo rispetto a questa, dobbiamo "indugiare" un tempo specifico per ricevere più messaggi. Questa impostazione è predefinita a 0, cioè nessun ritardo. Impostare linger.ms=5, ad esempio, ridurrà il numero di richieste, ma allo stesso tempo aumenterà il ritardo di 5ms. | | max.request.size | int | 1028576 | Medio | Il numero massimo di byte richiesti. Questa è anche una copertura efficace per la dimensione massima registrata. Nota: Il server ha un proprio override delle dimensioni dei record dei messaggi, che sono diversi da questa impostazione. Questa impostazione limita il numero di richieste che i produttori possono inviare in massa alla volta per prevenire un gran numero di richieste. | | ricevere.buffer.bytes | int | 32768 | Medio | Dimensione della ricezionecache TCP, utilizzata durante la lettura dei dati | | send.buffer.bytes | int | 131072 | Medio | Dimensione della cache di invio TCP, utilizzata per l'invio dei dati | | timeout.ms | int | 30000 | Medio | Questa opzione di configurazione controlla il tempo massimo in cui il server aspetta la conferma dai follower. Se il numero di richieste confermate non viene soddisfatto entro questo tempo, viene restituito un errore. Questo limite di timeout viene misurato lato server e non ha latenza di rete, incluse le richieste | | blocca.on.buffer.full | Booleano | true | basso | Quando la nostra cache di memoria si esaurisce, dobbiamo smettere di ricevere nuovi record di messaggi o lanciare errori. Di default, questo è impostato su vero, tuttavia alcuni blocchi potrebbero non essere giusti, quindi è meglio lanciare subito un errore. Questo vale quando impostato su falso: il produttore lancia un errore di eccezione: BufferExhaustedException se il record è stato inviato e la cache è piena | | metadata.fetch.timeout.ms | lunga | 60000 | basso | Si riferisce ai dati per la prima volta di alcuni elementi che abbiamo ottenuto. Gli elementi includono: argomento, host, partizioni. Questa configurazione si riferisce al tempo necessario affinché l'elemento si completi con successo secondo il fetch, altrimenti verrà inviata un'eccezione al client. | | metadata.max.age.ms | lunga | 300000 | basso | Il tempo in microsecondi è l'intervallo in cui costringiamo l'aggiornamento dei metadati. Anche se non vediamo cambiamenti nella leadership della partizione. | | metric.reporters | Elenco | [] | basso | Un elenco di classi utilizzate per misurare metriche. L'implementazione dell'interfaccia MetricReporter permetterà l'aggiunta di classi che cambiano man mano che vengono generate nuove metriche. JmxReporter includerà sempre un modo per registrare le statistiche JMX | | metrics.num.samples | int | 2 | basso | Il numero di campioni utilizzati per mantenere le metriche | | metrics.sample.window.ms | lunga | 30000 | basso | Il sistema Metrics mantiene un numero configurabile di campioni in una dimensione di finestra correttibile. Questa configurazione configura, ad esempio, la dimensione della finestra. Potremmo mantenere due campioni per una durata di 30 secondi. Quando una finestra viene spostata, cancelliamo e riscriviamo la finestra più vecchia | | recoonect.backoff.ms | lunga | 10 | basso | Quando la connessione fallisce, il tempo di attesa quando ci riconnettiamo. Questo evita riconnessioni ripetute dei clienti | | retry.backoff.ms | lunga | 100 | basso | Il tempo di attesa prima di provare a riprovare una richiesta di prodotti non riusciti. Evita di rimanere bloccato in un loop morto di invio-fallimento.
|
|